Skip to main content

Python

View as Markdown

RocketRide Python SDK

Build, run, and manage AI pipelines from Python.

PyPI GitHub Discord MIT License

Quick Start

pip install rocketride
import asyncio
from rocketride import RocketRideClient

async def main():
async with RocketRideClient(uri="https://cloud.rocketride.ai", auth="my-key") as client:
result = await client.use(filepath="pipeline.pipe")
token = result["token"]
out = await client.send(token, "Hello, pipeline!", objinfo={"name": "input.txt"}, mimetype="text/plain")
print(out)
await client.terminate(token)

asyncio.run(main())

URI scheme: the scheme selects the transport. The client normalizes the uri to a WebSocket address before connecting: https:// and wss:// both resolve to a secure wss:// connection, while http://, ws://, and a bare host:port resolve to plain ws://. For RocketRide Cloud use https://cloud.rocketride.ai (or the equivalent wss://cloud.rocketride.ai); for a local engine use ws://localhost:5565. Caution: against a Cloud endpoint always use https:// or wss://, because an http:// or ws:// URI (or a bare host:port) silently downgrades to an unencrypted ws:// connection.

Don't have a pipeline yet? Visit RocketRide on GitHub or download the extension directly in your IDE.

Install RocketRide extension

What is RocketRide?

RocketRide is an open-source, developer-native AI pipeline platform. It lets you build, debug, and deploy production AI workflows without leaving your IDE -- using a visual drag-and-drop canvas or code-first with TypeScript and Python SDKs.

  • 50+ ready-to-use nodes - 13 LLM providers, 8 vector databases, OCR, NER, PII anonymization, and more
  • High-performance C++ engine - production-grade speed and reliability
  • Deploy anywhere - locally, on-premises, or self-hosted with Docker
  • MIT licensed - fully open-source, OSI-compliant

You build your .pipe - and you run it against the fastest AI runtime available.

RocketRide visual canvas builder

Features

  • Pipeline execution - Start with use(), send data via send(), send_files(), or pipe()
  • Deployments - Persist pipelines server-side and run them on a cron schedule via client.deploy
  • Chat - Conversational AI via chat() and Question
  • Event streaming - Real-time events via on_event and set_events()
  • File upload - send_files() with progress; streaming with pipe()
  • Connection lifecycle - Optional persist mode, reconnection, and callbacks (on_connected, on_disconnected, on_connect_error)
  • Project storage - Save, retrieve, and version-control pipelines on the server
  • Async-first - Built on asyncio and websockets; supports async with context manager
  • CLI included - Manage pipelines from the command line

RocketRideClient

Constructor

RocketRideClient(
uri: str = "",
auth: str = "",
*,
env: dict = None,
module: str = None,
request_timeout: float = None,
max_retry_time: float = None,
persist: bool = False,
on_event = None,
on_connected = None,
on_disconnected = None,
on_connect_error = None,
on_protocol_message = None,
on_debug_message = None,
)

Why the options matter: uri and auth tell the client where and how to authenticate. persist and max_retry_time control what happens when the connection fails or the server is not ready yet: with persist=True the client retries with exponential backoff and calls on_connect_error on each failure, so you can show "Still connecting..." or "Connection failed" without implementing retry logic yourself. Use on_disconnected only for "we were connected and then dropped"; use on_connect_error for "failed to connect" or "gave up after max retry time."

ArgumentTypeRequiredDescription
uristrYes*Server URI. *Can be empty if ROCKETRIDE_URI is set in env/.env.
authstrYes*API key. *Can be empty if ROCKETRIDE_APIKEY is set.
envdictNoOverride env; if omitted, .env is loaded. Use when passing config in code instead of env files.
modulestrNoClient name for logging.
request_timeoutfloatNoDefault timeout in ms for requests. Prevents a single DAP call from hanging.
max_retry_timefloatNoMax time in ms to keep retrying connection. Use (e.g. 300000) so the app can show "gave up" after a bounded time.
persistboolNoEnable automatic reconnection. Default: False. Set True for long-lived scripts or UIs.
on_eventasync callableNoCalled with each server event dict. Use for progress or status updates.
on_connectedasync callableNoCalled when connection is established.
on_disconnectedasync callableNoCalled when connection is lost only if connected first; args: reason, has_error. Do not call disconnect() here if you want auto-reconnect.
on_connect_errorcallable (message: str)NoCalled on each failed connection attempt. On auth failure the client stops retrying.
on_protocol_messagecallable (message: str)NoOptional; for logging raw DAP messages. Helpful when debugging protocol issues.
on_debug_messagecallable (message: str)NoOptional; for debug output.

Raises ValueError if both uri and ROCKETRIDE_URI are empty or if auth is missing and not in env.

Example - client with persist and callbacks:

client = RocketRideClient(
uri="https://cloud.rocketride.ai",
auth="my-key",
persist=True,
max_retry_time=300000,
on_connect_error=lambda msg: print("Connect error:", msg),
on_event=handle_event,
)

Context manager

MethodSignatureReturnsDescription
__aenter__async def __aenter__(self)selfEnters context; calls connect().
__aexit__async def __aexit__(self, exc_type, exc_val, exc_tb)-Exits context; calls disconnect().

How to use: Prefer async with RocketRideClient(...) as client: so the connection is always closed when you leave the block, even on exception. No need to call disconnect() manually.

Example:

async with RocketRideClient(uri="wss://cloud.rocketride.ai", auth=os.environ["ROCKETRIDE_APIKEY"]) as client:
result = await client.use(filepath="pipeline.json")
token = result["token"]
await client.send(token, "Hello, pipeline!")

Connection

MethodSignatureReturnsDescription
connectasync def connect(self, uri: str = None, auth: str = None, timeout: float = None) -> None-Opens the WebSocket and performs DAP auth. Optional uri/auth override the constructor values for this connection attempt. Optional timeout (ms) bounds the connect + auth handshake (non-persist only). In persist mode, on failure the client calls on_connect_error and retries; on auth failure it does not retry.
disconnectasync def disconnect(self) -> None-Closes the connection and cancels reconnection. Call when the user disconnects or the script is done.
is_connecteddef is_connected(self) -> boolboolWhether the client is connected. Check before calling use() or send() if needed.
set_connection_paramsasync def set_connection_params(self, uri: str = None, auth: str = None) -> None-Updates server URI and/or auth at runtime. If currently connected, disconnects and reconnects with the new params (in persist mode, reconnection is scheduled; otherwise reconnects once). Use when the user changes server or credentials without creating a new client.
get_connection_infodef get_connection_info(self) -> dictdictCurrent connection state and URI. Returns { 'connected': bool, 'transport': str, 'uri': str }. Useful for debugging or displaying "Connected to ..." in the UI.
get_apikeydef get_apikey(self) -> Optional[str]str | NoneThe API key in use. For debugging only; avoid logging in production.

Low-level DAP

MethodSignatureReturnsDescription
build_requestdef build_request(self, command: str, *, token: str = None, arguments: dict = None, data: bytes | str = None) -> dictdictBuilds a DAP request message. Use for custom commands not covered by use(), send(), etc.
requestasync def request(self, request: dict, timeout: float = None) -> dictdictSends the request and returns the response. timeout in ms overrides the default for this call. Use did_fail(response) before trusting body.
dap_requestasync def dap_request(self, command: str, arguments: dict = None, token: str = None, timeout: float = None) -> dictdictShorthand: builds a request and sends it in one call. Equivalent to build_request() + request().
did_faildef did_fail(self, request: dict) -> boolboolReturns True when the response indicates failure (success === False).

Example:

# Two-step (build then request)
req = client.build_request("rrext_monitor", token=token, arguments={"types": ["apaevt_status_upload"]})
res = await client.request(req, timeout=5000)

# One-step with dap_request
res = await client.dap_request("rrext_services", {}, timeout=5000)

if client.did_fail(res):
raise RuntimeError(res.get("message", "Request failed"))

Pipeline execution

MethodSignatureReturnsDescription
useasync def use(self, *, token: str = None, filepath: str = None, pipeline: dict = None, source: str = None, threads: int = None, use_existing: bool = None, args: list = None, ttl: int = None) -> dictdictStarts a pipeline. Requires filepath or pipeline. The client substitutes ${ROCKETRIDE_*} from its env. Returns a dict with at least 'token'; use that token for all data and control operations.
terminateasync def terminate(self, token: str) -> None-Stops the pipeline and frees server resources.
get_task_statusasync def get_task_status(self, token: str) -> dictdictReturns current task status (e.g. completed count, total, state). Poll until completed or use for progress display.

Why a token: The server runs each pipeline as a separate task. The token identifies that task so send(), send_files(), pipe(), chat(), and get_task_status() target the correct pipeline.

Data

MethodSignatureReturnsDescription
pipeasync def pipe(self, token: str, objinfo: dict = None, mime_type: str = None, provider: str = None) -> DataPipeDataPipeCreates a streaming pipe: open, then one or more write, then close. Use for large or chunked data. Default MIME: 'application/octet-stream'.
sendasync def send(self, token: str, data: str | bytes, objinfo: dict = None, mimetype: str = None) -> PIPELINE_RESULTPIPELINE_RESULTSends data in one shot (open pipe, write once, close). Use when you have the full payload in memory.
send_filesasync def send_files(self, files: List[str | Tuple[str, dict] | Tuple[str, dict, str]], token: str) -> List[UPLOAD_RESULT]List[UPLOAD_RESULT]Uploads files. Each item: path str, or (path, objinfo), or (path, objinfo, mimetype). Progress via on_event as apaevt_status_upload.

When to use pipe vs send: Use send() for a single string or bytes. Use pipe() when you read a file in chunks, or when data arrives incrementally.

Example - send a string:

result = await client.send(token, "Hello, pipeline!", objinfo={"name": "greeting.txt"}, mimetype="text/plain")

Example - stream with a pipe:

pipe = await client.pipe(token, mime_type="application/json")
await pipe.open()
await pipe.write(b'{"key": "value1"}')
await pipe.write(b'{"key": "value2"}')
result = await pipe.close()

Events

MethodSignatureReturnsDescription
set_eventsasync def set_events(self, token: str, event_types: List[str]) -> None-Subscribes this task to the given event types. After this, those events are delivered to on_event. Call after use() when you need upload or processing progress.

Services, validation, and ping

MethodSignatureReturnsDescription
get_servicesasync def get_services(self) -> dictdictReturns all service definitions. Use to discover what the server supports.
get_serviceasync def get_service(self, service: str) -> Optional[dict]dict | NoneReturns one service by name; None if not found or on error.
validateasync def validate(self, pipeline: PipelineConfig, *, source: str = None) -> dictdictValidates a pipeline configuration without starting it. Returns validation results (e.g. errors, warnings). Use to check pipeline correctness before use().
pingasync def ping(self, token: str = None) -> None-Liveness check; raises on failure.

Chat

MethodSignatureReturnsDescription
chatasync def chat(self, *, token: str, question: Question) -> PIPELINE_RESULTPIPELINE_RESULTSends the Question to the AI for the given token and returns the pipeline result. The answer is in the result body; use the schema's answer helpers if you need to parse JSON from the AI text.

How it works: The client opens a pipe with the question MIME type, writes the serialized Question, closes the pipe, and returns the server result. The pipeline must support the chat provider.

Deploy

Accessed via client.deploy. A deployment persists a pipeline on the server and runs it on a cron schedule (or on demand with "manual"), outliving the client connection. Each deployment is identified by its pipeline's project_id.

MethodSignatureReturnsDescription
deploy.addasync def add(self, pipeline: PipelineConfig, *, schedule: str | None = None) -> DeploymentRecordDeploymentRecordPersists the pipeline as a deployment and activates it. schedule: 5-field cron ("*/15 * * * *"), preset (@hourly, @daily, …), or "manual" (default).
deploy.removeasync def remove(self, project_id: str) -> None-Undeploys and removes the deployment.
deploy.listasync def list(self) -> list[DeploymentRecord]list[DeploymentRecord]Returns the authenticated user's deployments.
deploy.statusasync def status(self, project_id: str) -> DeploymentRecordDeploymentRecordGets one deployment record.
deploy.updateasync def update(self, project_id: str, *, pipeline: PipelineConfig | None = None, schedule: str | None = None) -> None-Replaces the pipeline and/or schedule; omitted parameters stay unchanged.

States: state is 'active' (scheduled runs fire per cron), 'paused', or 'errored' — scheduled runs could no longer authenticate (e.g. the owner's API key was revoked) and have stopped; remove and re-add the deployment to resume. If a scheduled run is still in progress when the next tick comes due, that tick is skipped — runs of the same deployment never overlap.

Example:

record = await client.deploy.add(my_pipeline, schedule="*/15 * * * *")
for rec in await client.deploy.list():
print(rec["pipeline"]["project_id"], rec["schedule"], rec["state"])
await client.deploy.update(project_id, schedule="manual") # pause scheduled runs
await client.deploy.remove(project_id)

DataPipe

Returned by await client.pipe(...). One streaming upload: open -> write (one or more) -> close. You can also use it as an async context manager: entering calls open(), exiting calls close().

PropertyTypeDescription
is_openedboolWhether the pipe is open.
pipe_idint | NoneServer-assigned pipe ID after open().
MethodSignatureReturnsDescription
openasync def open(self) -> DataPipeselfOpens the pipe; required before write().
writeasync def write(self, buffer: bytes) -> None-Writes a chunk. Pipe must be open.
closeasync def close(self) -> PIPELINE_RESULTPIPELINE_RESULTCloses the pipe and returns the processing result.
__aenter__async def __aenter__(self)selfEnters context; calls open().
__aexit__async def __aexit__(self, exc_type, exc_val, exc_tb)-Exits context; calls close().

Question

From rocketride.schema. Build a question for client.chat(token=..., question=question). Add instructions, examples, context, history, and documents to steer the AI.

Constructor

Question(
type: QuestionType = QuestionType.QUESTION,
filter: DocFilter = None,
expectJson: bool = False,
role: str = '',
)

QuestionType: QUESTION, SEMANTIC, KEYWORD, GET, PROMPT. Default type is QUESTION. Default filter and expectJson=False, role='' if omitted.

Methods

MethodSignatureDescription
addInstructionaddInstruction(self, title: str, instruction: str)Adds an instruction (e.g. "Use bullet points").
addExampleaddExample(self, given: str, result: dict | list | str)Adds an example input/output; result can be dict/list (JSON-serialized).
addContextaddContext(self, context: str | dict | List[str] | List[dict])Adds context.
addHistoryaddHistory(self, item: QuestionHistory)Adds a history item for multi-turn chat.
addQuestionaddQuestion(self, question: str)Appends the question text.
addDocumentsaddDocuments(self, documents: Doc | List[Doc])Adds documents for the AI to reference.
getPromptgetPrompt(self, has_previous_json_failed: bool = False) -> strReturns the full prompt (internal).

Answer

From rocketride.schema. Used to parse chat response content. The client does not attach an Answer instance to the pipeline result; you read the response body and, if needed, use these helpers to extract JSON or code from AI text (which often includes markdown or code fences).

MethodSignatureDescription
getTextgetText(self) -> strGet the answer as plain text.
getJsongetJson(self) -> Optional[dict]Get the answer as parsed JSON; returns None if not valid JSON.
isJsonisJson(self) -> boolWhether the answer contains valid JSON.
parseJsonparseJson(self, value: str) -> AnyParses JSON from AI text (strips markdown/code blocks).
parsePythonparsePython(self, value: str) -> AnyExtracts Python code from a code block in the response.

Types

  • PIPELINE_RESULT: TypedDict with name, path, objectId, optional result_types, and dynamic fields.
  • UPLOAD_RESULT: Per-file result with action, filepath, error?, result?, upload_time?, etc.
  • TASK_STATUS: Task status with completedCount, totalCount, completed, state, exitCode, and many more fields.
  • DAPMessage: Dict with type, seq, and optional command, arguments, body, success, message, event, token, etc.
  • PipelineConfig: Pipeline definition with name, description, version, components, source, project_id.
  • DeploymentRecord: TypedDict with pipeline, schedule, state ('active' | 'paused' | 'errored'), userId, createdAt, updatedAt (Unix seconds).
  • QuestionHistory: { 'role': str, 'content': str }.
  • QuestionInstruction: { 'subtitle': str, 'instructions': str }.
  • QuestionExample: { 'given': str, 'result': str }.

Exceptions

The exception hierarchy provides fine-grained error handling:

DAPException                    # Base DAP protocol error (has dap_result dict)
└── RocketRideException # Base for all RocketRide errors
├── ConnectionException # Connection/network issues
│ └── AuthenticationException # Bad API key or credentials
├── PipeException # Data pipe errors (open/write/close)
├── ExecutionException # Pipeline start/run failures
└── ValidationException # Invalid input/config

All exceptions expose a dap_result dict with detailed server error context.

AuthenticationException is thrown on DAP auth failure. In persist mode the client catches it, calls on_connect_error, and does not retry so the app can fix credentials and call connect() again.

Example:

from rocketride import RocketRideClient, AuthenticationException
from rocketride.core.exceptions import PipeException, ExecutionException

try:
async with RocketRideClient(uri=uri, auth=auth) as client:
result = await client.use(filepath="pipeline.json")
await client.send(result["token"], data)
except AuthenticationException:
print("Bad credentials")
except ExecutionException as e:
print(f"Pipeline failed: {e}")
except PipeException as e:
print(f"Data transfer error: {e}")

Examples (Full API Usage)

1. Minimal: connect, run pipeline from file, send one string, disconnect

import asyncio
from rocketride import RocketRideClient

async def main():
client = RocketRideClient(uri="https://cloud.rocketride.ai", auth="my-key")
await client.connect()
result = await client.use(filepath="pipeline.json")
token = result["token"]
out = await client.send(token, "Hello, pipeline!", objinfo={"name": "input.txt"}, mimetype="text/plain")
print(out)
await client.terminate(token)
await client.disconnect()

asyncio.run(main())
import asyncio
from rocketride import RocketRideClient

async def main():
async with RocketRideClient(uri="wss://cloud.rocketride.ai", auth="my-key") as client:
result = await client.use(pipeline={"pipeline": my_pipeline_config})
token = result["token"]
await client.send(token, '{"data": 1}')
status = await client.get_task_status(token)
print(status)
await client.terminate(token)

asyncio.run(main())

3. Long-lived app: persist mode, callbacks, and status handling

import asyncio
from rocketride import RocketRideClient

async def main():
client = RocketRideClient(
uri="https://cloud.rocketride.ai",
auth="my-key",
persist=True,
max_retry_time=300000,
on_connected=lambda info: print("Connected:", info),
on_disconnected=lambda reason, has_error: print("Disconnected:", reason, has_error),
on_connect_error=lambda msg: print("Connect error:", msg),
on_event=lambda e: print(e.get("event"), e.get("body")),
)
await client.connect()
# Later: use(), send_files(), etc. If connection drops, client retries; do not call disconnect() in on_disconnected.

asyncio.run(main())

4. Upload multiple files and poll until pipeline completes

import asyncio
from pathlib import Path
from rocketride import RocketRideClient

async def main():
client = RocketRideClient(uri="https://cloud.rocketride.ai", auth="my-key")
await client.connect()
result = await client.use(filepath="vectorize.json")
token = result["token"]
await client.set_events(token, ["apaevt_status_upload", "apaevt_status_processing"])

files = ["doc1.md", "doc2.md", ("doc3.json", {"tag": "export"}, "application/json")]
upload_results = await client.send_files(files, token)
for r in upload_results:
if r["action"] == "complete":
print("OK", r["filepath"])
else:
print("Failed", r["filepath"], r.get("error"))

while True:
status = await client.get_task_status(token)
print(f"Progress: {status.get('completedCount', 0)}/{status.get('totalCount', 0)}")
if status.get("completed"):
break
await asyncio.sleep(2)
await client.terminate(token)
await client.disconnect()

asyncio.run(main())

5. Streaming large data with a pipe

import asyncio
from rocketride import RocketRideClient

async def main():
async with RocketRideClient(uri="https://cloud.rocketride.ai", auth="my-key") as client:
result = await client.use(filepath="ingest.json")
token = result["token"]
pipe = await client.pipe(token, objinfo={"name": "large.csv"}, mime_type="text/csv")
await pipe.open()
with open("large.csv", "rb") as f:
while True:
chunk = f.read(64 * 1024)
if not chunk:
break
await pipe.write(chunk)
result = await pipe.close()
print(result)
await client.terminate(token)

asyncio.run(main())

6. Chat: question with instructions and examples, parse JSON answer

import asyncio
from rocketride import RocketRideClient
from rocketride.schema import Question, Answer

async def main():
async with RocketRideClient(uri="https://cloud.rocketride.ai", auth="my-key") as client:
result = await client.use(filepath="chat_pipeline.json")
token = result["token"]
question = Question(expectJson=True)
question.addInstruction("Format", "Return a JSON object with keys: summary, keywords.")
question.addExample("Summarize X", {"summary": "...", "keywords": ["a", "b"]})
question.addQuestion("Summarize the main points and list keywords.")
response = await client.chat(token=token, question=question)
answer_text = response.get("data", {}).get("answer") or (response.get("answers") or [None])[0]
structured = Answer().parseJson(answer_text) if answer_text else None
print(structured)
await client.terminate(token)

asyncio.run(main())

7. Discover services and send a custom DAP request

import asyncio
from rocketride import RocketRideClient

async def main():
client = RocketRideClient(uri="https://cloud.rocketride.ai", auth="my-key")
await client.connect()
services = await client.get_services()
print("Available:", list(services.keys()))
ocr = await client.get_service("ocr")
if ocr:
print("OCR schema:", ocr.get("schema"))
req = client.build_request("rrext_ping", token=my_token)
res = await client.request(req, timeout=5000)
if client.did_fail(res):
raise RuntimeError(res.get("message", "Ping failed"))
await client.disconnect()

asyncio.run(main())

CLI

The rocketride command is installed automatically with the package.

rocketride start pipeline.json              # Start a pipeline
rocketride upload *.pdf --token <token> # Upload files to a running pipeline
rocketride status --token <token> # Monitor task progress
rocketride stop --token <token> # Terminate a running task
rocketride list # List all active tasks
rocketride events ALL --token <token> # Stream task events
rocketride rrext_store get_all_projects # List stored projects

All commands accept --uri and --apikey flags, or read from environment variables.

Configuration

VariableDescription
ROCKETRIDE_URIServer URI (e.g. wss://cloud.rocketride.ai or ws://localhost:5565)
ROCKETRIDE_APIKEYAPI key for authentication

License

MIT - see LICENSE.