from __future__ import annotations import json from typing import Dict, Literal, TypedDict from urllib.error import HTTPError, URLError from urllib.parse import urlencode from urllib.request import Request, urlopen from typing_extensions import NotRequired from speech_recognition.audio import AudioData from speech_recognition.exceptions import RequestError, UnknownValueError class Alternative(TypedDict): transcript: str confidence: float class Result(TypedDict): alternative: list[Alternative] final: bool class GoogleResponse(TypedDict): result: list[Result] result_index: NotRequired[int] ProfanityFilterLevel = Literal[0, 1] RequestHeaders = Dict[str, str] ENDPOINT = "http://www.google.com/speech-api/v2/recognize" class RequestBuilder: def __init__( self, *, endpoint: str, key: str, language: str, filter_level: ProfanityFilterLevel, ) -> None: self.endpoint = endpoint self.key = key self.language = language self.filter_level = filter_level def build(self, audio_data: AudioData) -> Request: if not isinstance(audio_data, AudioData): raise ValueError("``audio_data`` must be audio data") url = self.build_url() headers = self.build_headers(audio_data) flac_data = self.build_data(audio_data) request = Request(url, data=flac_data, headers=headers) return request def build_url(self) -> str: """ >>> builder = RequestBuilder(endpoint="http://www.google.com/speech-api/v2/recognize", key="awesome-key", language="en-US", filter_level=0) >>> builder.build_url() 'http://www.google.com/speech-api/v2/recognize?client=chromium&lang=en-US&key=awesome-key&pFilter=0' """ params = urlencode( { "client": "chromium", "lang": self.language, "key": self.key, "pFilter": self.filter_level, } ) return f"{self.endpoint}?{params}" def build_headers(self, audio_data: AudioData) -> RequestHeaders: """ >>> builder = RequestBuilder(endpoint="", key="", language="", filter_level=1) >>> audio_data = AudioData(b"", 16_000, 1) >>> builder.build_headers(audio_data) {'Content-Type': 'audio/x-flac; rate=16000'} """ rate = audio_data.sample_rate headers = {"Content-Type": f"audio/x-flac; rate={rate}"} return headers def build_data(self, audio_data: AudioData) -> bytes: flac_data = audio_data.get_flac_data( convert_rate=self.to_convert_rate(audio_data.sample_rate), convert_width=2, # audio samples must be 16-bit ) return flac_data @staticmethod def to_convert_rate(sample_rate: int) -> int: """Audio samples must be at least 8 kHz >>> RequestBuilder.to_convert_rate(16_000) >>> RequestBuilder.to_convert_rate(8_000) >>> RequestBuilder.to_convert_rate(7_999) 8000 """ return None if sample_rate >= 8000 else 8000 def create_request_builder( *, endpoint: str, key: str | None = None, language: str = "en-US", filter_level: ProfanityFilterLevel = 0, ) -> RequestBuilder: if not isinstance(language, str): raise ValueError("``language`` must be a string") if key is not None and not isinstance(key, str): raise ValueError("``key`` must be ``None`` or a string") if key is None: key = "AIzaSyBOti4mM-6x9WDnZIjIeyEU21OpBXqWBgw" return RequestBuilder( endpoint=endpoint, key=key, language=language, filter_level=filter_level, ) class OutputParser: def __init__(self, *, show_all: bool, with_confidence: bool) -> None: self.show_all = show_all self.with_confidence = with_confidence def parse(self, response_text: str): actual_result = self.convert_to_result(response_text) if self.show_all: return actual_result best_hypothesis = self.find_best_hypothesis( actual_result["alternative"] ) # https://cloud.google.com/speech-to-text/docs/basics#confidence-values # "Your code should not require the confidence field as it is not guaranteed to be accurate, or even set, in any of the results." confidence = best_hypothesis.get("confidence", 0.5) if self.with_confidence: return best_hypothesis["transcript"], confidence return best_hypothesis["transcript"] @staticmethod def convert_to_result(response_text: str) -> Result: r""" >>> response_text = '''{"result":[]} ... {"result":[{"alternative":[{"transcript":"one two three","confidence":0.49585345},{"transcript":"1 2","confidence":0.42899391}],"final":true}],"result_index":0} ... ''' >>> OutputParser.convert_to_result(response_text) {'alternative': [{'transcript': 'one two three', 'confidence': 0.49585345}, {'transcript': '1 2', 'confidence': 0.42899391}], 'final': True} >>> OutputParser.convert_to_result("") Traceback (most recent call last): ... speech_recognition.exceptions.UnknownValueError >>> OutputParser.convert_to_result('\n{"result":[]}') Traceback (most recent call last): ... speech_recognition.exceptions.UnknownValueError >>> OutputParser.convert_to_result('{"result":[{"foo": "bar"}]}') Traceback (most recent call last): ... speech_recognition.exceptions.UnknownValueError >>> OutputParser.convert_to_result('{"result":[{"alternative": []}]}') Traceback (most recent call last): ... speech_recognition.exceptions.UnknownValueError """ # ignore any blank blocks for line in response_text.split("\n"): if not line: continue result: list[Result] = json.loads(line)["result"] if len(result) != 0: if len(result[0].get("alternative", [])) == 0: raise UnknownValueError() return result[0] raise UnknownValueError() @staticmethod def find_best_hypothesis(alternatives: list[Alternative]) -> Alternative: """ >>> alternatives = [{"transcript": "one two three", "confidence": 0.42899391}, {"transcript": "1 2", "confidence": 0.49585345}] >>> OutputParser.find_best_hypothesis(alternatives) {'transcript': 'one two three', 'confidence': 0.42899391} >>> alternatives = [{"confidence": 0.49585345}] >>> OutputParser.find_best_hypothesis(alternatives) Traceback (most recent call last): ... speech_recognition.exceptions.UnknownValueError """ if "confidence" in alternatives: # BUG: actual_result["alternative"] (=alternatives) is list, not dict # return alternative with highest confidence score best_hypothesis: Alternative = max( alternatives, key=lambda alternative: alternative["confidence"], ) else: # when there is no confidence available, we arbitrarily choose the first hypothesis. best_hypothesis: Alternative = alternatives[0] if "transcript" not in best_hypothesis: raise UnknownValueError() return best_hypothesis def obtain_transcription(request: Request, timeout: int) -> str: try: response = urlopen(request, timeout=timeout) except HTTPError as e: raise RequestError("recognition request failed: {}".format(e.reason)) except URLError as e: raise RequestError( "recognition connection failed: {}".format(e.reason) ) return response.read().decode("utf-8") def recognize_legacy( recognizer, audio_data: AudioData, key: str | None = None, language: str = "en-US", pfilter: ProfanityFilterLevel = 0, show_all: bool = False, with_confidence: bool = False, *, endpoint: str = ENDPOINT, ): """Performs speech recognition on ``audio_data`` (an ``AudioData`` instance), using the Google Speech Recognition API. The Google Speech Recognition API key is specified by ``key``. If not specified, it uses a generic key that works out of the box. This should generally be used for personal or testing purposes only, as it **may be revoked by Google at any time**. To obtain your own API key, simply following the steps on the `API Keys `__ page at the Chromium Developers site. In the Google Developers Console, Google Speech Recognition is listed as "Speech API". The recognition language is determined by ``language``, an RFC5646 language tag like ``"en-US"`` (US English) or ``"fr-FR"`` (International French), defaulting to US English. A list of supported language tags can be found in this `StackOverflow answer `__. The profanity filter level can be adjusted with ``pfilter``: 0 - No filter, 1 - Only shows the first character and replaces the rest with asterisks. The default is level 0. Returns the most likely transcription if ``show_all`` is false (the default). Otherwise, returns the raw API response as a JSON dictionary. Raises a ``speech_recognition.UnknownValueError`` exception if the speech is unintelligible. Raises a ``speech_recognition.RequestError`` exception if the speech recognition operation failed, if the key isn't valid, or if there is no internet connection. """ request_builder = create_request_builder( endpoint=endpoint, key=key, language=language, filter_level=pfilter ) request = request_builder.build(audio_data) response_text = obtain_transcription( request, timeout=recognizer.operation_timeout ) output_parser = OutputParser( show_all=show_all, with_confidence=with_confidence ) return output_parser.parse(response_text)