Getting live speech into Emacs with Deepgram's streaming API
| speechtotext, emacs, speech, streaming- getting live speech from Emacs into Etherpad : Reorganized code to call a list of functions and pass the recognition results. Added Etherpad. Took out the mode; will just use the functions. Related:
This is a quick demonstration of using Deepgram's streaming API to do speech recognition live. It isn't as accurate as OpenAI Whisper but since Whisper doesn't have a streaming API, it'll do for now. I can correct misrecognized words manually. I tend to talk really quickly, so it displays the words per minute in my modeline. I put the words into an Org Mode buffer so I can toggle headings with avy and cycle visibility. When I'm done, it saves the text, JSON, and WAV for further processing. I think it'll be handy to have a quick way to take live notes during interviews or when I'm thinking out loud. Could be fun!
I'm still getting some weirdness when the mode turns on when I don't
expect it, so that's something to look into. Maybe I won't use it as a
mode for now. I'll just use my-live-speech-start
and
my-live-speech-stop
.
General code
(defvar my-live-speech-buffer "*Speech*") (defvar my-live-speech-process nil) (defvar my-live-speech-output-buffer "*Speech JSON*") (defvar my-live-speech-functions '(my-live-speech-display-in-speech-buffer my-live-speech-display-wpm my-live-speech-append-to-etherpad) "Functions to call with one argument, the recognition results.") (defun my-live-speech-start () "Turn on live captions." (interactive) (with-current-buffer (get-buffer-create my-live-speech-buffer) (unless (process-live-p my-live-speech-process) (let ((default-directory "~/proj/deepgram-live")) (message "%s" default-directory) (with-current-buffer (get-buffer-create my-live-speech-output-buffer) (erase-buffer)) (setq my-live-speech-recent-words nil my-live-speech-wpm-string "READY ") (setq my-deepgram-process (make-process :command '("bash" "run.sh") :name "speech" :filter 'my-live-speech-json-filter :sentinel #'my-live-speech-process-sentinel :buffer my-live-speech-output-buffer))) (org-mode)) (display-buffer (current-buffer)))) (defun my-live-speech-stop () (interactive) (if (process-live-p my-live-speech-process) (kill-process my-live-speech-process)) (setq my-live-speech-wpm-string nil)) ;; (define-minor-mode my-live-speech-mode ;; "Show live speech and display WPM. ;; Need to check how to reliably turn this on and off." ;; :global t :group 'sachac ;; (if my-live-speech-mode ;; (my-live-speech-start) ;; (my-live-speech-stop) ;; (setq my-live-speech-wpm-string nil))) ;; based on subed-mpv::client-filter (defun my-live-speech-handle-json (line-object) "Process the JSON object in LINE." (run-hook-with-args 'my-live-speech-functions (json-parse-string line :object-type 'alist))) (defun my-live-speech-process-sentinel (proc event) (when (string-match "finished" event) (my-live-speech-stop) ;(my-live-speech-mode -1) )) (defun my-live-speech-json-filter (proc string) (when (buffer-live-p (process-buffer proc)) (with-current-buffer (process-buffer proc) (let* ((proc-mark (process-mark proc)) (moving (= (point) proc-mark))) ;; insert the output (save-excursion (goto-char proc-mark) (insert string) (set-marker proc-mark (point))) (if moving (goto-char proc-mark)) ;; process and remove all complete lines of JSON (lines are complete if ending with \n) (let ((pos (point-min))) (while (progn (goto-char pos) (end-of-line) (equal (following-char) ?\n)) (let* ((end (point)) (line (buffer-substring pos end))) (delete-region pos (+ end 1)) (with-current-buffer (get-buffer my-live-speech-buffer) (my-live-speech-handle-json line)))))))))
Python code based on the Deepgram streaming test suite:
Very rough app.py
# Based on streaming-test-suite # https://developers.deepgram.com/docs/getting-started-with-the-streaming-test-suite import pyaudio import asyncio import json import os import websockets from datetime import datetime import wave import sys startTime = datetime.now() key = os.environ['DEEPGRAM_API_KEY'] live_json = os.environ.get('LIVE_CAPTIONS_JSON', True) all_mic_data = [] all_transcripts = [] all_words = [] FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 CHUNK = 8000 audio_queue = asyncio.Queue() REALTIME_RESOLUTION = 0.250 SAMPLE_SIZE = 0 def save_info(): global SAMPLE_SIZE base = startTime.strftime('%Y%m%d%H%M') wave_file_path = os.path.abspath(f"{base}.wav") wave_file = wave.open(wave_file_path, "wb") wave_file.setnchannels(CHANNELS) wave_file.setsampwidth(SAMPLE_SIZE) wave_file.setframerate(RATE) wave_file.writeframes(b"".join(all_mic_data)) wave_file.close() with open(f"{base}.txt", "w") as f: f.write("\n".join(all_transcripts)) with open(f"{base}.json", "w") as f: f.write(json.dumps(all_words)) if live_json: print(f'{{"msg": "🟢 Saved to {base}.txt , {base}.json , {base}.wav", "base": "{base}"}}') else: print(f"🟢 Saved to {base}.txt , {base}.json , {base}.wav") # Used for microphone streaming only. def mic_callback(input_data, frame_count, time_info, status_flag): audio_queue.put_nowait(input_data) return (input_data, pyaudio.paContinue) async def run(key, method="mic", format="text", **kwargs): deepgram_url = f'wss://api.deepgram.com/v1/listen?punctuate=true&smart_format=true&utterances=true&encoding=linear16&sample_rate=16000' async with websockets.connect( deepgram_url, extra_headers={"Authorization": "Token {}".format(key)} ) as ws: async def sender(ws): try: while True: mic_data = await audio_queue.get() all_mic_data.append(mic_data) await ws.send(mic_data) except websockets.exceptions.ConnectionClosedOK: await ws.send(json.dumps({"type": "CloseStream"})) if live_json: print('{"msg": "Closed."}') else: print("Closed.") async def receiver(ws): global all_words """Print out the messages received from the server.""" first_message = True first_transcript = True transcript = "" async for msg in ws: res = json.loads(msg) if first_message: first_message = False try: # handle local server messages if res.get("msg"): if live_json: print(json.dumps(res)) else: print(res["msg"]) if res.get("is_final"): transcript = ( res.get("channel", {}) .get("alternatives", [{}])[0] .get("transcript", "") ) if transcript != "": if first_transcript: first_transcript = False if live_json: print(json.dumps(res.get("channel", {}).get("alternatives", [{}])[0])) else: print(transcript) all_transcripts.append(transcript) all_words = all_words + res.get("channel", {}).get("alternatives", [{}])[0].get("words", []) # if using the microphone, close stream if user says "goodbye" if method == "mic" and "goodbye" in transcript.lower(): await ws.send(json.dumps({"type": "CloseStream"})) if live_json: print('{"msg": "Done."}') else: print("Done.") # handle end of stream if res.get("created"): save_info() except KeyError: print(f"🔴 ERROR: Received unexpected API response! {msg}") # Set up microphone if streaming from mic async def microphone(): audio = pyaudio.PyAudio() stream = audio.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, stream_callback=mic_callback, ) stream.start_stream() global SAMPLE_SIZE SAMPLE_SIZE = audio.get_sample_size(FORMAT) while stream.is_active(): await asyncio.sleep(0.1) stream.stop_stream() stream.close() functions = [ asyncio.ensure_future(sender(ws)), asyncio.ensure_future(receiver(ws)), ] functions.append(asyncio.ensure_future(microphone())) if live_json: print('{"msg": "Ready."}') else: print("🟢 Ready.") await asyncio.gather(*functions) def main(): """Entrypoint for the example.""" # Parse the command-line arguments. try: asyncio.run(run(key, "mic", "text")) except websockets.exceptions.InvalidStatusCode as e: print(f'🔴 ERROR: Could not connect to Deepgram! {e.headers.get("dg-error")}') print( f'🔴 Please contact Deepgram Support (developers@deepgram.com) with request ID {e.headers.get("dg-request-id")}' ) return except websockets.exceptions.ConnectionClosedError as e: error_description = f"Unknown websocket error." print( f"🔴 ERROR: Deepgram connection unexpectedly closed with code {e.code} and payload {e.reason}" ) if e.reason == "DATA-0000": error_description = "The payload cannot be decoded as audio. It is either not audio data or is a codec unsupported by Deepgram." elif e.reason == "NET-0000": error_description = "The service has not transmitted a Text frame to the client within the timeout window. This may indicate an issue internally in Deepgram's systems or could be due to Deepgram not receiving enough audio data to transcribe a frame." elif e.reason == "NET-0001": error_description = "The service has not received a Binary frame from the client within the timeout window. This may indicate an internal issue in Deepgram's systems, the client's systems, or the network connecting them." print(f"🔴 {error_description}") # TODO: update with link to streaming troubleshooting page once available # print(f'🔴 Refer to our troubleshooting suggestions: ') print( f"🔴 Please contact Deepgram Support (developers@deepgram.com) with the request ID listed above." ) return except websockets.exceptions.ConnectionClosedOK: return except Exception as e: print(f"🔴 ERROR: Something went wrong! {e}") save_info() return if __name__ == "__main__": sys.exit(main() or 0)
The Python script sends the microphone stream to Deepgram and prints out the JSON output. The Emacs Lisp code starts an asynchronous process and reads the JSON output, displaying the transcript and calculating the WPM based on the words. run.sh just loads the venv for this project (requirements.txt based on the streaming text suite) and then runs app.py, since some of the Python library versions conflict with other things I want to experiment with.
I also added
my-live-speech-wpm-string
to my mode-line-format
manually using
Customize, since I wanted it displayed on the left side instead of
getting lost when I turn keycast-mode
on.
I'm still a little anxious about accidentally leaving a process
running, so I check with ps aux | grep python3
. Eventually I'll
figure out how to make sure everything gets properly stopped when I'm
done.
Anyway, there it is!
Display in speech buffer
(defun my-live-speech-display-in-speech-buffer (recognition-results) (with-current-buffer (get-buffer-create my-live-speech-buffer) (let-alist recognition-results (let* ((pos (point)) (at-end (eobp))) (goto-char (point-max)) (unless (eolp) (insert "\n")) (when .msg (insert .msg "\n")) (when .transcript (insert .transcript "\n")) ;; scroll to the bottom if being displayed (if at-end (when (get-buffer-window (current-buffer)) (set-window-point (get-buffer-window (current-buffer)) (point))) (goto-char pos)))))) (defun my-live-speech-toggle-heading () "Toggle a line as a heading." (interactive) (with-current-buffer (get-buffer my-live-speech-buffer) (display-buffer (current-buffer)) (with-selected-window (get-buffer-window (get-buffer my-live-speech-buffer)) (let ((avy-all-windows nil)) (avy-goto-line 1)) (org-toggle-heading 1)))) (defun my-live-speech-cycle-visibility () "Get a quick overview." (interactive) (with-current-buffer (get-buffer my-live-speech-buffer) (display-buffer (current-buffer)) (if (eq org-cycle-global-status 'contents) (progn (run-hook-with-args 'org-cycle-pre-hook 'all) (org-fold-show-all '(headings blocks)) (setq org-cycle-global-status 'all) (run-hook-with-args 'org-cycle-hook 'all)) (run-hook-with-args 'org-cycle-pre-hook 'contents) (org-cycle-content) (setq org-cycle-global-status 'contents) (run-hook-with-args 'org-cycle-hook 'contents))))
Display words per minute
(defvar my-live-speech-wpm-window-seconds 15 "How many seconds to calculate WPM for.") (defvar my-live-speech-recent-words nil "Words spoken in `my-live-speech-wpm-window-minutes'.") (defvar my-live-speech-wpm nil "Current WPM.") (defvar my-live-speech-wpm-colors ; haven't figured out how to make these work yet '((180 :foreground "red") (170 :foreground "yellow") (160 :foreground "green"))) (defvar my-live-speech-wpm-string nil "Add this somewhere in `mode-line-format'.") (defun my-live-speech-wpm-string () (propertize (format "%d WPM " my-live-speech-wpm) 'face (cdr (seq-find (lambda (row) (> my-live-speech-wpm (car row))) my-live-speech-wpm-colors)))) (defun my-live-speech-display-wpm (recognition-results) (let-alist recognition-results (when .words ;; calculate WPM (setq my-live-speech-recent-words (append my-live-speech-recent-words .words nil)) (let ((threshold (- (assoc-default 'end (aref .words (1- (length .words)))) my-live-speech-wpm-window-seconds))) (setq my-live-speech-recent-words (seq-filter (lambda (o) (>= (assoc-default 'start o) threshold)) my-live-speech-recent-words)) (setq my-live-speech-wpm (/ (length my-live-speech-recent-words) (/ (- (assoc-default 'end (aref .words (1- (length .words)))) (assoc-default 'start (car my-live-speech-recent-words))) 60.0))) (setq my-live-speech-wpm-string (my-live-speech-wpm-string))))))
Append to EmacsConf Etherpad
(defvar my-live-speech-etherpad-id nil) (defun my-live-speech-append-to-etherpad (recognition-results) (when my-live-speech-etherpad-id (emacsconf-pad-append-text my-live-speech-etherpad-id (concat " " (assoc-default 'transcript recognition-results)))))