Skip to main content

Send / Send Files / Pipe

View as Markdown

Overview

RocketRide provides three methods for sending data to a running pipeline:

MethodUse Case
send()Send a single piece of text or binary data
send_files() / sendFiles()Upload one or more files with progress tracking
pipe() / DataPipeStream large datasets in chunks

All three methods require a token from a previously started pipeline (via use()).


send()

Send text or binary data to a running pipeline and get the processing result.

Method Signature

Python (async):

result = await client.send(token, data, objinfo=None, mimetype=None)

TypeScript:

const result = await client.send(token, data, objinfo?, mimetype?);

Parameters

ParameterTypeRequiredDescription
tokenstr / stringYesPipeline task token from use()
datastr | bytes / string | Uint8ArrayYesData to send for processing
objinfodict / Record<string, unknown>NoMetadata about the data (e.g., {"name": "data.txt"})
mimetypestr / stringNoMIME type of the data (auto-detected if not specified)

Returns

  • Type: PIPELINE_RESULT, dictionary containing the pipeline's processing output

Examples

from rocketride import RocketRideClient

async with RocketRideClient(auth='your-api-key') as client:
result = await client.use(filepath='text_analyzer.json')
token = result['token']

# Send text data
response = await client.send(token, 'Analyze this text for sentiment')
print(response)

# Send JSON data
import json
response = await client.send(
token,
json.dumps({'name': 'John', 'age': 30}),
mimetype='application/json',
)

# Send binary data
with open('data.bin', 'rb') as f:
response = await client.send(token, f.read(), mimetype='application/octet-stream')
import { RocketRideClient } from 'rocketride';

const client = new RocketRideClient({ auth: 'your-api-key' });
await client.connect();

const result = await client.use({ filepath: './text_analyzer.json' });

// Send string data
const response = await client.send(result.token, 'Analyze this text');

// Send binary data
const encoder = new TextEncoder();
const buffer = encoder.encode('Binary data');
const response2 = await client.send(result.token, buffer, { name: 'data.txt' });

await client.disconnect();

send_files() / sendFiles()

Upload multiple files to a pipeline with parallel transfers and progress tracking.

Method Signature

Python (async):

results = await client.send_files(files, token)

TypeScript:

const results = await client.sendFiles(files, token);

Parameters

Python:

ParameterTypeRequiredDescription
fileslistYesList of file paths or tuples (see formats below)
tokenstrYesPipeline task token

Each file entry can be:

  • "path/to/file.pdf": just a file path
  • ("path/to/file.pdf", {"category": "doc"}): file path with metadata
  • ("path/to/file.pdf", {"name": "doc"}, "application/pdf"): file path with metadata and MIME type

TypeScript:

ParameterTypeRequiredDescription
filesArray<{ file: File, objinfo?, mimetype? }>YesArray of File objects with optional metadata
tokenstringYesPipeline task token

Returns

  • Type: UPLOAD_RESULT[], array of upload results, one per file

Each result contains:

FieldTypeDescription
actionstr'complete' or 'error'
filepathstrFile path/name
bytes_sentintBytes transferred
file_sizeintTotal file size
upload_timefloatUpload duration in seconds
resultdictPipeline processing result (if successful)
errorstrError message (if failed)

Examples

from rocketride import RocketRideClient

async with RocketRideClient(auth='your-api-key') as client:
result = await client.use(filepath='document_processor.json')
token = result['token']

# Simple file list
files = ['document1.pdf', 'data.csv', 'report.docx']
results = await client.send_files(files, token)

for r in results:
if r['action'] == 'complete':
print(f"Uploaded {r['filepath']} in {r['upload_time']:.2f}s")
else:
print(f"Failed {r['filepath']}: {r['error']}")

# With metadata and MIME types
files = [
('report.pdf', {'department': 'finance'}),
('data.csv', {'type': 'quarterly'}, 'text/csv'),
]
results = await client.send_files(files, token)
const files = [
{ file: fileObject1, mimetype: 'application/pdf' },
{ file: fileObject2, objinfo: { department: 'finance' } },
];
const results = await client.sendFiles(files, result.token);

Progress Events

If you configure an event handler, you'll receive upload progress events:

async def handle_events(event):
if event['event'] == 'apaevt_status_upload':
body = event['body']
if body['action'] == 'write':
pct = (body['bytes_sent'] / body['file_size']) * 100
print(f"{body['filepath']}: {pct:.1f}%")

client = RocketRideClient(auth='your-api-key', on_event=handle_events)

pipe() / DataPipe

Create a streaming data pipe for sending large datasets in chunks.

Method Signature

Python (async):

pipe = await client.pipe(token, objinfo=None, mime_type=None, provider=None)

TypeScript:

const pipe = await client.pipe(token, objinfo?, mimeType?, provider?);

Parameters

ParameterTypeRequiredDescription
tokenstr / stringYesPipeline task token
objinfodict / Record<string, unknown>NoMetadata about the data
mime_type / mimeTypestr / stringNoMIME type of the data
providerstr / stringNoOptional provider specification

DataPipe Methods

MethodDescription
open()Open the pipe for writing (must call before write())
write(buffer)Write a chunk of data (bytes in Python, Uint8Array in TypeScript)
close()Close the pipe and get processing results

Examples

Using context manager (Python, recommended):

async with await client.pipe(token, mime_type='text/csv') as pipe:
for chunk in csv_chunks:
await pipe.write(chunk.encode())
result = await pipe.close()

Manual open/write/close:

pipe = await client.pipe(token, mime_type='application/json')
await pipe.open()
await pipe.write(json.dumps(data_part1).encode())
await pipe.write(json.dumps(data_part2).encode())
result = await pipe.close()
const pipe = await client.pipe(token, {}, 'text/csv');
await pipe.open();
await pipe.write(new TextEncoder().encode('header1,header2\n'));
await pipe.write(new TextEncoder().encode('value1,value2\n'));
const result = await pipe.close();

Response Format

Both send() and DataPipe.close() return a PIPELINE_RESULT: the output from the pipeline's processing. The exact structure depends on your pipeline configuration, but it typically includes fields like:

{
"name": "document.pdf",
"path": "/processed/document.pdf",
"objectId": "obj-12345",
"result_types": ["text", "metadata"],
"text": ["Extracted text content..."],
"metadata": { ... }
}

Error Handling

ErrorCause
ValueError / ErrorInvalid data type (not string/bytes)
RuntimeError / ErrorPipeline not running, send failed, or pipe operation failed
FileNotFoundErrorFile path in send_files() doesn't exist
try:
result = await client.send(token, data)
except RuntimeError as e:
print(f'Send failed: {e}')

API Endpoint

These methods communicate via the RocketRide DAP protocol over WebSocket. The equivalent HTTP endpoint for data operations is:

  • Method: POST /task/data
  • Query Parameters: token={token}

The /webhook path is also available as an alias for POST /task/data.