Getting live speech into Emacs with Deepgram's streaming API

| emacs, speech, streaming

This is a quick demonstration of using Deepgram's streaming API to do speech recognition live. It isn't as accurate as OpenAI Whisper but since Whisper doesn't have a streaming API, it'll do for now. I can correct misrecognized words manually. I tend to talk really quickly, so it displays the words per minute in my modeline. I put the words into an Org Mode buffer so I can toggle headings with avy and cycle visibility. When I'm done, it saves the text, JSON, and WAV for further processing. I think it'll be handy to have a quick way to take live notes during interviews or when I'm thinking out loud. Could be fun!

I'm still getting some weirdness when the mode turns on when I don't expect it, so that's something to look into. Maybe I won't use it as a mode for now. I'll just use my-live-speech-start and my-live-speech-stop.

General code

(defvar my-live-speech-buffer "*Speech*")
(defvar my-live-speech-process nil)
(defvar my-live-speech-output-buffer "*Speech JSON*")

(defvar my-live-speech-functions
  '(my-live-speech-display-in-speech-buffer
    my-live-speech-display-wpm
    my-live-speech-append-to-etherpad)
  "Functions to call with one argument, the recognition results.")

(defun my-live-speech-start ()
  "Turn on live captions."
  (interactive)
  (with-current-buffer (get-buffer-create my-live-speech-buffer)
    (unless (process-live-p my-live-speech-process)
      (let ((default-directory "~/proj/deepgram-live"))
        (message "%s" default-directory)
        (with-current-buffer (get-buffer-create my-live-speech-output-buffer)
          (erase-buffer))
        (setq my-live-speech-recent-words nil
              my-live-speech-wpm-string "READY ")
        (setq my-deepgram-process
              (make-process
               :command '("bash" "run.sh")
               :name "speech"
               :filter 'my-live-speech-json-filter
               :sentinel #'my-live-speech-process-sentinel
               :buffer my-live-speech-output-buffer)))
      (org-mode))
    (display-buffer (current-buffer))))

(defun my-live-speech-stop ()
  (interactive)
  (if (process-live-p my-live-speech-process)
      (kill-process my-live-speech-process))
  (setq my-live-speech-wpm-string nil))

;; (define-minor-mode my-live-speech-mode
;;  "Show live speech and display WPM.
;; Need to check how to reliably turn this on and off."
;;  :global t :group 'sachac
;;  (if my-live-speech-mode
;;      (my-live-speech-start)
;;    (my-live-speech-stop)
;;    (setq my-live-speech-wpm-string nil)))

;; based on subed-mpv::client-filter
(defun my-live-speech-handle-json (line-object)
  "Process the JSON object in LINE."
  (run-hook-with-args 'my-live-speech-functions (json-parse-string line :object-type 'alist)))

(defun my-live-speech-process-sentinel (proc event)
  (when (string-match "finished" event)
    (my-live-speech-stop)
    ;(my-live-speech-mode -1)
    ))

(defun my-live-speech-json-filter (proc string)
  (when (buffer-live-p (process-buffer proc))
    (with-current-buffer (process-buffer proc)
      (let* ((proc-mark (process-mark proc))
             (moving (= (point) proc-mark)))
        ;;  insert the output
        (save-excursion
          (goto-char proc-mark)
          (insert string)
          (set-marker proc-mark (point)))
        (if moving (goto-char proc-mark))
        ;; process and remove all complete lines of JSON (lines are complete if ending with \n)
        (let ((pos (point-min)))
          (while (progn (goto-char pos)
                        (end-of-line)
                        (equal (following-char) ?\n))
            (let* ((end (point))
                   (line (buffer-substring pos end)))
              (delete-region pos (+ end 1))
              (with-current-buffer (get-buffer my-live-speech-buffer)
                (my-live-speech-handle-json line)))))))))

Python code based on the Deepgram streaming test suite:

Very rough app.py
# Based on streaming-test-suite
# https://developers.deepgram.com/docs/getting-started-with-the-streaming-test-suite

import pyaudio
import asyncio
import json
import os
import websockets
from datetime import datetime
import wave
import sys

startTime = datetime.now()
key = os.environ['DEEPGRAM_API_KEY']
live_json = os.environ.get('LIVE_CAPTIONS_JSON', True)
all_mic_data = []
all_transcripts = []
all_words = []
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 8000

audio_queue = asyncio.Queue()
REALTIME_RESOLUTION = 0.250
SAMPLE_SIZE = 0

def save_info():
    global SAMPLE_SIZE
    base = startTime.strftime('%Y%m%d%H%M')
    wave_file_path = os.path.abspath(f"{base}.wav")
    wave_file = wave.open(wave_file_path, "wb")
    wave_file.setnchannels(CHANNELS)
    wave_file.setsampwidth(SAMPLE_SIZE)
    wave_file.setframerate(RATE)
    wave_file.writeframes(b"".join(all_mic_data))
    wave_file.close()
    with open(f"{base}.txt", "w") as f:
        f.write("\n".join(all_transcripts))
    with open(f"{base}.json", "w") as f:
        f.write(json.dumps(all_words))
    if live_json:
        print(f'{{"msg": "🟢 Saved to {base}.txt , {base}.json , {base}.wav", "base": "{base}"}}')
    else:
        print(f"🟢 Saved to {base}.txt , {base}.json , {base}.wav")

# Used for microphone streaming only.
def mic_callback(input_data, frame_count, time_info, status_flag):
    audio_queue.put_nowait(input_data)
    return (input_data, pyaudio.paContinue)

async def run(key, method="mic", format="text", **kwargs):
    deepgram_url = f'wss://api.deepgram.com/v1/listen?punctuate=true&smart_format=true&utterances=true&encoding=linear16&sample_rate=16000'
    async with websockets.connect(
        deepgram_url, extra_headers={"Authorization": "Token {}".format(key)}
    ) as ws:
        async def sender(ws):
            try:
                while True:
                    mic_data = await audio_queue.get()
                    all_mic_data.append(mic_data)
                    await ws.send(mic_data)
            except websockets.exceptions.ConnectionClosedOK:
                await ws.send(json.dumps({"type": "CloseStream"}))
                if live_json:
                    print('{"msg": "Closed."}')
                else:
                    print("Closed.")
        async def receiver(ws):
            global all_words
            """Print out the messages received from the server."""
            first_message = True
            first_transcript = True
            transcript = ""
            async for msg in ws:
                res = json.loads(msg)
                if first_message:
                    first_message = False
                try:
                    # handle local server messages
                    if res.get("msg"):
                        if live_json:
                            print(json.dumps(res))
                        else:
                            print(res["msg"])
                    if res.get("is_final"):
                        transcript = (
                            res.get("channel", {})
                            .get("alternatives", [{}])[0]
                            .get("transcript", "")
                        )
                        if transcript != "":
                            if first_transcript:
                                first_transcript = False
                            if live_json:
                                print(json.dumps(res.get("channel", {}).get("alternatives", [{}])[0]))
                            else:
                                print(transcript)
                            all_transcripts.append(transcript)
                            all_words = all_words + res.get("channel", {}).get("alternatives", [{}])[0].get("words", [])
                        # if using the microphone, close stream if user says "goodbye"
                        if method == "mic" and "goodbye" in transcript.lower():
                            await ws.send(json.dumps({"type": "CloseStream"}))
                            if live_json:
                                print('{"msg": "Done."}')
                            else:
                                print("Done.")
                    # handle end of stream
                    if res.get("created"):
                        save_info()
                except KeyError:
                    print(f"🔴 ERROR: Received unexpected API response! {msg}")

        # Set up microphone if streaming from mic
        async def microphone():
            audio = pyaudio.PyAudio()
            stream = audio.open(
                format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK,
                stream_callback=mic_callback,
            )

            stream.start_stream()

            global SAMPLE_SIZE
            SAMPLE_SIZE = audio.get_sample_size(FORMAT)

            while stream.is_active():
                await asyncio.sleep(0.1)

            stream.stop_stream()
            stream.close()

        functions = [
            asyncio.ensure_future(sender(ws)),
            asyncio.ensure_future(receiver(ws)),
        ]

        functions.append(asyncio.ensure_future(microphone()))
        if live_json:
            print('{"msg": "Ready."}')
        else:
            print("🟢 Ready.")
        await asyncio.gather(*functions)

def main():
    """Entrypoint for the example."""
    # Parse the command-line arguments.
    try:
        asyncio.run(run(key, "mic", "text"))
    except websockets.exceptions.InvalidStatusCode as e:
        print(f'🔴 ERROR: Could not connect to Deepgram! {e.headers.get("dg-error")}')
        print(
            f'🔴 Please contact Deepgram Support (developers@deepgram.com) with request ID {e.headers.get("dg-request-id")}'
        )
        return
    except websockets.exceptions.ConnectionClosedError as e:
        error_description = f"Unknown websocket error."
        print(
            f"🔴 ERROR: Deepgram connection unexpectedly closed with code {e.code} and payload {e.reason}"
        )

        if e.reason == "DATA-0000":
            error_description = "The payload cannot be decoded as audio. It is either not audio data or is a codec unsupported by Deepgram."
        elif e.reason == "NET-0000":
            error_description = "The service has not transmitted a Text frame to the client within the timeout window. This may indicate an issue internally in Deepgram's systems or could be due to Deepgram not receiving enough audio data to transcribe a frame."
        elif e.reason == "NET-0001":
            error_description = "The service has not received a Binary frame from the client within the timeout window. This may indicate an internal issue in Deepgram's systems, the client's systems, or the network connecting them."

        print(f"🔴 {error_description}")
        # TODO: update with link to streaming troubleshooting page once available
        # print(f'🔴 Refer to our troubleshooting suggestions: ')
        print(
            f"🔴 Please contact Deepgram Support (developers@deepgram.com) with the request ID listed above."
        )
        return

    except websockets.exceptions.ConnectionClosedOK:
        return

    except Exception as e:
        print(f"🔴 ERROR: Something went wrong! {e}")
        save_info()
        return


if __name__ == "__main__":
    sys.exit(main() or 0)

The Python script sends the microphone stream to Deepgram and prints out the JSON output. The Emacs Lisp code starts an asynchronous process and reads the JSON output, displaying the transcript and calculating the WPM based on the words. run.sh just loads the venv for this project (requirements.txt based on the streaming text suite) and then runs app.py, since some of the Python library versions conflict with other things I want to experiment with.

I also added my-live-speech-wpm-string to my mode-line-format manually using Customize, since I wanted it displayed on the left side instead of getting lost when I turn keycast-mode on.

I'm still a little anxious about accidentally leaving a process running, so I check with ps aux | grep python3. Eventually I'll figure out how to make sure everything gets properly stopped when I'm done.

Anyway, there it is!

Display in speech buffer

(defun my-live-speech-display-in-speech-buffer (recognition-results)
  (with-current-buffer (get-buffer-create my-live-speech-buffer)
    (let-alist recognition-results
      (let* ((pos (point))
             (at-end (eobp)))
        (goto-char (point-max))
        (unless (eolp) (insert "\n"))
        (when .msg
          (insert .msg "\n"))
        (when .transcript
          (insert .transcript "\n"))
        ;; scroll to the bottom if being displayed
        (if at-end
            (when (get-buffer-window (current-buffer))
              (set-window-point (get-buffer-window (current-buffer)) (point)))
          (goto-char pos))))))

(defun my-live-speech-toggle-heading ()
  "Toggle a line as a heading."
  (interactive)
  (with-current-buffer (get-buffer my-live-speech-buffer)
    (display-buffer (current-buffer))
    (with-selected-window (get-buffer-window (get-buffer my-live-speech-buffer))
      (let ((avy-all-windows nil))
        (avy-goto-line 1))
      (org-toggle-heading 1))))

(defun my-live-speech-cycle-visibility ()
  "Get a quick overview."
  (interactive)
  (with-current-buffer (get-buffer my-live-speech-buffer)
    (display-buffer (current-buffer))
    (if (eq org-cycle-global-status 'contents)
        (progn
          (run-hook-with-args 'org-cycle-pre-hook 'all)
          (org-fold-show-all '(headings blocks))
          (setq org-cycle-global-status 'all)
          (run-hook-with-args 'org-cycle-hook 'all))
      (run-hook-with-args 'org-cycle-pre-hook 'contents)
      (org-cycle-content)
      (setq org-cycle-global-status 'contents)
      (run-hook-with-args 'org-cycle-hook 'contents))))

Display words per minute

(defvar my-live-speech-wpm-window-seconds 15 "How many seconds to calculate WPM for.")
(defvar my-live-speech-recent-words nil "Words spoken in `my-live-speech-wpm-window-minutes'.")
(defvar my-live-speech-wpm nil "Current WPM.")
(defvar my-live-speech-wpm-colors  ; haven't figured out how to make these work yet
  '((180 :foreground "red")
    (170 :foreground "yellow")
    (160 :foreground "green")))
(defvar my-live-speech-wpm-string nil "Add this somewhere in `mode-line-format'.")
(defun my-live-speech-wpm-string ()
  (propertize
   (format "%d WPM " my-live-speech-wpm)
   'face
   (cdr (seq-find (lambda (row) (> my-live-speech-wpm (car row))) my-live-speech-wpm-colors))))

(defun my-live-speech-display-wpm (recognition-results)
  (let-alist recognition-results
    (when .words
      ;; calculate WPM
      (setq my-live-speech-recent-words
            (append my-live-speech-recent-words .words nil))
      (let ((threshold (- (assoc-default 'end (aref .words (1- (length .words))))
                          my-live-speech-wpm-window-seconds)))
        (setq my-live-speech-recent-words
              (seq-filter
               (lambda (o)
                 (>= (assoc-default 'start o)
                     threshold))
               my-live-speech-recent-words))
        (setq my-live-speech-wpm
              (/
               (length my-live-speech-recent-words)
               (/ (- (assoc-default 'end (aref .words (1- (length .words))))
                     (assoc-default 'start (car my-live-speech-recent-words)))
                  60.0)))
        (setq my-live-speech-wpm-string (my-live-speech-wpm-string))))))

Append to EmacsConf Etherpad

(defvar my-live-speech-etherpad-id nil)
(defun my-live-speech-append-to-etherpad (recognition-results)
  (when my-live-speech-etherpad-id
    (emacsconf-pad-append-text my-live-speech-etherpad-id (concat " " (assoc-default 'transcript recognition-results)))))
This is part of my Emacs configuration.
You can comment with Disqus or you can e-mail me at sacha@sachachua.com.