If you were to ask 10 people what “streaming” is, you would get 10 different answers. The only thing they would agree on is the timing… streaming is processing data in real-time. The minute the data is written to a persistent store and then queried, you’ve lost real-time.
What about combining streaming with Artificial Intelligence(AI)? Does real-time take a different shape? Are the rules rewritten? The short answer - no. In terms of processing data in real time, it doesn’t matter what the processing is. What matters is how the data flows.
The flow of streaming data almost always includes a message broker. Mainly because there are usually many ways one wants to “watch” data and many ways that data can be ingested.
The LangStream project combines these two worlds (streaming data and AI) to create an enterprise-ready environment for creating no/low-code AI processing pipelines that work in real-time.
Let’s look at the latest features in LangStream 0.4.
Stateful agents
When it comes to streaming, holding a pipeline’s state can be a controversial topic. At its core, streaming data in real-time means staying in memory and processing as fast as possible. When lookups are added, that core definition begins to skew. But we’re creating applications to do something meaningful, not enforce a pure model. Holding state in a data processing pipeline can be a significant feature that takes it to the next level.
LangStream 0.4 added the option for all agents to hold state. Because LangStream runs natively in Kubernetes, mapping a disk was almost too easy. In the K8s cluster where LangStream is running, create a Storage Class (or stick with default) and provide its name in the “resources” area of any agent. LangStream will persist the agent’s output record(s) to the store.
- name: "Stateful processing using Python"
resources:
disk:
enabled: true
size: 50M
id: "my-python-processor"
type: "python-processor"
If you wanted to get the most out of stateful agents, you could create a custom Python agent using the stateful option. The mapped drive’s directory is included in the context, where your custom Python code could read and write as needed.
from langstream import SimpleRecord, Processor, AgentContext
import os
class Exclamation(Processor):
def init(self, config, context: AgentContext):
self.context = context
def process(self, record):
directory = self.context.get_persistent_state_directory()
counter_file = os.path.join(directory, "counter.txt")
...
All the rules of Kubernetes stores would apply. High availability, geo-distributions, and high resilience are just a few of the benefits. Not to mention the improved efficiency of looking up data on a volume and not having to navigate all those network hops and firewalls.
Read the docs to learn more about stateful agents.
Apache Camel as a pipeline source
Apache Camel “is an Open Source integration framework that empowers you to quickly and easily integrate various systems consuming or producing data”.
The power of Apache Camel is in its defined framework and specification that integrators follow. Instead of building your own source or sink connection for all the different platforms in your organization, implement an Apache Camel client and instantly have access to all of them.
This same principle is a natural fit in LangStream, but it’s taken a step further. There is a new “camel-source” agent type that removes the need for any code to be written. Just specify with Apache Camel integration you want to be the pipeline’s source along with applicable values and you’ve got a new custom LangStream source!
See a working example or read more about the agent in the docs.
New “Service” agent component type
Agents in LangStream represent runnable processes. To run a process you need configuration, details about 3rd party connections, an API client, or executable binary, among other assets. When all this is organized together with a lifecycle manager, you have a LangStream agent that is runnable by the LangStream runtime.
Agents are implemented as a step in a pipeline. Their configuration, type, and other values are set so LangStream can plan its lifecycle and execute it. You have the option to combine multiple agents as a single executable. Which provides a very convenient way to optimize processing.
Agents do things like completions with LLMs and generating embeddings. They also integrate quite well with other AI libraries like LangChain. Agents can help transform data into a known (JSON) structure, to make processing more stable.
Each type of agent also carries a “component type”. This is something that is set within the agent type. Since its creation, LangStream has offered component types of source, sink, and processor. Source and sink represent the beginning and end of data processing. A processor represents an action that expects data as input and outputs its results but doesn’t persist any data.
If your LangStream pipeline doesn’t have a source, then it is assumed the first step will watch a message topic for data. Then the question becomes, where is that data coming from? Manually producing topic data is fine for testing but not very reasonable in production. Requiring other services to implement a messaging client to produce their data to the topic is a perfectly fine solution for production but not every service has the option to bring in (and manage) a client library.
The new service component type gives LangStream agents the ability to start a web server and watch an assigned port for (HTTP) messages.
Similar to how a source component watches a service for new documents and how the input messaging topic watches for new records, a service component has a web service that captures request data and passes it to the pipeline for processing.
To aid in implementing the new service component a new “python-service” agent type has also been introduced. One of the main differences between this agent and others is that this will be a long-running process. So LangStream runtime should handle its lifecycle a bit differently.
Examples of using the python-service agent:
LangServe Application
This new feature in the LangChain project provides a pre-configured (opinionated) web service that hosts a few endpoints for ingesting data. This is a perfect fit for a python-service agent. It binds the pod’s routable port with the web service.
FastAPI
The FastAPI project is a framework to create “high performance, easy to learn, fast to code, ready for production” API endpoints in your Python application. Simply implement an HTTP POST endpoint and provide that in the python-service agent.
Uvicorn
Uvicorn is “an ASGI web server, for Python”. It can create simple HTTP endpoints that work perfectly in the python-service agent.
Virtually any Python framework that implements a web service and is bindable to the Pod’s port is compatible with the python-service agent. An easy to get started with the agent is to follow LangStream’s LangServe example application. This simple pipeline offers a pre-built LangChain application that implements LangServe as a means to run the ChatOpenAI flow.
Read more about the python-service in documentation.
New “HTTP” and “Service” gateway types
A LangStream Gateway is a WebSocket service that removes the need to implement a messaging client. Simply open the custom “produce” socket, send data and the pipeline will kick off its processing. If the pipeline includes an output topic, use the gateway “consume” socket to watch for data.
Almost all programming languages offer a Websocket library. But sometimes you’re application just doesn’t need the added overhead of managing an open socket. That is where HTTP shines. Its request/response model offers a simple way to move data between services.
Starting in LangStream 0.4, all gateways using the “produce” type can accept both WebSocket connections as well as HTTP 1.1 connections. If your application doesn’t need the overhead of managing an open socket, use its HTTP client to send a POST request. LangStream will use the request’s body to create a new message on the pipeline’s input topic.
An example gateway manifest would include:
gateways:
- id: "user-input"
type: produce
topic: "questions-topic"
parameters:
- sessionId
Then you could use curl to POST a new message:
curl -X POST -d '{"value": {"question": "hello"}, "key": "k1", headers: {"h1": "v1"}}' -H 'Content-Type: application/json' "http://localhost:8091/api/gateways/produce/my-tenant/my-app/user-input?param:sessionId=12543yusi1"
Watch for a response
If you would also like to connect the output of the pipeline with the HTTP response, you can use the new gateway “service” type. This new gateway type is a dedicated HTTP socket that accepts data as POST and keeps the connection open until the pipeline completes processing, responding with the result.
The gateway uses unique identifiers to keep track of each HTTP request. Making sure not to mix messages between requests. There are timeouts and the normal HTTP tool to manage the connection.
An example gateway manifest would include:
gateways:
- id: "user-input-await"
type: service
parameters:
- sessionId
service-options:
input-topic: inputs
output-topic: results
Then you could POST data and wait for the result with curl:
curl -X POST -d '{"value": "hello"}' -H 'Content-Type: application/json' "http://localhost:8091/api/gateways/service/my-tenant/my-app/user-input-await"
Proxy to python-service agent
If you read the previous section about the new service component type and its complementing new “python-service” agent type, you will no doubt see a natural fit between the gateway HTTP capabilities and running a Python web server. You can set the gateway type to “service” and point to an agent that implements the python-service type. This creates a way to put the gateway at the edge of the network and proxy requests to the pipeline.
Remember that LangStream Gateway’s support integration with google and github identity services. Using a gateway as a proxy to the python-service means you could include oAuth authentication services without writing one line of code!
An example of this using Google Identity with the gateway as a proxy:
gateways:
- id: "my-service"
type: service
parameters:
- sessionId
service-options:
agent-id: my-service-agent
authentication:
provider: google
allow-test-mode: true
configuration:
clientId: "${secrets.google.client-id}"
produce-options:
headers:
- key: langstream-client-user-id
value-from-authentication: subject
- key: langstream-client-session-id
value-from-parameters: sessionId
pipeline:
- name: "Start my service"
id: my-service-agent
type: "python-service"
configuration:
className: example.ChatBotService
curl -X POST \
-d '{"value": "hello"}' \
-H 'Authorization: Bearer XXX' \
-H 'Content-Type: application/json' \
"http://localhost:8091/api/gateways/service/my-tenant/my-app/my-service/the/custom/path?service-param-1=yes"
Learn more about the HTTP gateway in LangStream documentation.