Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Chat Streaming fails #340

Open
thomashacker opened this issue Nov 9, 2023 · 1 comment
Open

Async Chat Streaming fails #340

thomashacker opened this issue Nov 9, 2023 · 1 comment

Comments

@thomashacker
Copy link

thomashacker commented Nov 9, 2023

Hey Cohere Team 馃憢 , I'm currently trying to implement async Token Streaming with Cohere (cohere==4.33). However, when running this code, I get some errors:

completion = co.chat(
    chat_history=_conversation,
    stream=True,
    message=message,
    model="command",
    temperature=0.1,
)

>>> async for chunk in completion: # Breaks here
    if isinstance(chunk, StreamTextGeneration):
        yield {
            "message": chunk.text,
            "finish_reason": "",
        }
    elif isinstance(chunk, StreamEnd):
        yield {
            "message": "",
            "finish_reason": "stop",
        }

I get this error message:

TypeError: 'async for' requires an object with __aiter__ method, got bytes

Which comes from the StreamingChat object (code):

async def __aiter__(self) -> Generator[StreamResponse, None, None]:
    index = 0
    >>> async for line in self.response.content: # self.response.content are bytes
        item = self._make_response_item(index, line)
        index += 1
        if item is not None:
            yield item

When looking at the origin of the response, I get the _request method (code) which runs:

with requests.Session() as session:
            retries = Retry(
                total=self.max_retries,
                backoff_factor=0.5,
                allowed_methods=["POST", "GET"],
                status_forcelist=cohere.RETRY_STATUS_CODES,
                raise_on_status=False,
            )
            session.mount("https://", HTTPAdapter(max_retries=retries))
            session.mount("http://", HTTPAdapter(max_retries=retries))

            if stream:
                return session.request(method, url, headers=headers, json=json, **self.request_dict, stream=True)

Not sure whether my implementation is incorrect, please let me know if my code is incorrect or if you can reproduce the error! Thanks a lot 馃殌 Also happy about any directions to examples or documentation.

Edit: It works when I remove the async for loop in my code, but then the method gets synchronous :(

@mehrdad-es
Copy link

could you past the completion object,_conversation and one iteration of what chunk is?

It seems the problem is a data type problem 'async for' requires an object with __aiter__ method, got bytes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants