Skip to main content

HTTP External Stream

You can send data to HTTP endpoints via the HTTP External Stream. You can use this feature to trigger Slack notifications or send streaming data to downstream systems, such as Splunk, Datadog, Elasticsearch, Databricks, or any other HTTP-based service.

Currently, it only supports writing data to HTTP endpoints, but reading data from HTTP endpoints is not supported yet.

Create HTTP External Stream

To create an external stream for HTTP endpoints, you can run the following DDL SQL:

CREATE EXTERNAL STREAM [IF NOT EXISTS] name
(<col_name1> <col_type1>, <col_name2> <col_type2>, ...)
SETTINGS
type = 'http',
url = '', -- the HTTP URL the external stream read/write data from/to
data_format = '..', -- case-sentive, besides common ones as JSONEachRow, also support OpenSearch and ElasticSearch. Use Template to create customized payload
write_method = 'POST', -- optional, the HTTP method for write, default to POST
compression_method = 'none', -- optional, method for handle request/response body
use_chunked_encoding = true, -- optional, use Chunked Transfer Encoding for sending data
http_header_..= '..', -- optional, HTTP header key-value pairs
-- optional auth settings
username = '..', -- usename for HTTP basic authentication ( conflicts with http_header_Authorization )
password = '..', -- password for HTTP basic authentication
-- settings for Template format
format_template_resultset_format = '..',
format_template_row_format = '..',
format_template_rows_between_delimiter = '..', --default to '\n'
-- optiona SSL related settings
ssl_ca_cert_file = '..', -- the path of the CA certificate file
ssl_ca_pem = '..', -- the content of the CA certificate file (in PEM format), conflicts with ssl_ca_cert_file
skip_ssl_cert_check = true|false, -- optional, default to false. Set it to true for self-signed certificate
client_key = '..', -- the private key for client
-- timeout settings
http_keep_alive_timeout = 10, -- timeout (in seconds) for HTTP keep-alive connection
send_timeout = 300, -- seconds
receive_timeout = 300 -- seconds

For the full list of settings, see the DDL Settings section.

Examples

Write to Algolia

The Algolia Ingestion API accepts multiple rows in a single request in the following JSON payload:

{
"action":"addObject",
"records":[
{"objectID":"id1","column1":"value1","column2":"value2"},
{"objectID":"id2","column1":"value1","column2":"value2"},
]
}

The objectID need to be unique for each record. Assuming the document contains 3 columns: firstname, lastname, and zip_code, you can create the HTTP external stream as this:

CREATE EXTERNAL STREAM http_algolia_t1 (
objectID string default uuid(),
firstname string,
lastname string,
zip_code int32)
SETTINGS
type = 'http',
http_header_x_algolia_application_id='..',
http_header_x_algolia_api_key='..',
url = 'https://data.us.algolia.com/2/tasks/../push',
data_format = 'Template',
format_template_resultset_format='{"action":"addObject","records":[${data}]}',
format_template_row_format='{"objectID":${objectID:JSON},"firstname":${firstname:JSON},"lastname":${lastname:JSON},"zip_code":${zip_code:JSON}}',
format_template_rows_between_delimiter=','

Follow the documentation to create a "Push to Algolia" connector, and obtain the task ID, as well as application ID and API key.

You can insert multiple rows to the algolia index via materialized views or INSERT command:

INSERT INTO http_algolia_t1(firstname,lastname,zip_code)
VALUES('firstnameA','lastnameA',123),('firstnameB','lastnameB',987)

DDL Settings

type

The type of the external stream. The value must be http to send data to HTTP endpoints.

config_file

The config_file setting is available since Timeplus Enterprise 2.7. You can specify the path to a file that contains the configuration settings. The file should be in the format of key=value pairs, one pair per line. You can set the HTTP credentials or Authentication tokens in the file.

Please follow the example in Kafka External Stream.

url

The endpoint of the HTTP service. Different services and different use cases may have different endpoints. For example, to send data to a specified OpenSearch index, you can use http://host:port/my_index/_bulk. To send data to multiple indexes (depending on the column in the streaming SQL), you can use http://host:port/_bulk and also specify the output_format_opensearch_index_column.

data_format

The data_format specifies how the HTTP POST body is constructed. We have built a special integration with OpenSearch or ElasticSearch, with data_format='OpenSearch' or data_format='ElasticSearch'.

Other supported values for data_format are:

  • JSONEachRow: each row of the message as a single JSON document. The top level JSON key/value pairs will be parsed or constructed as the columns. If you need the POST body to be a JSON array for multiple rows in a batch, also set output_format_json_array_of_rows=1 in the DDL. If you need each POST body represents a single row as a JSON object, set one_message_per_row=true.
  • CSV: less commonly used.
  • TSV: similar to CSV but tab as the separator
  • ProtobufSingle: for single Protobuf message per message
  • Protobuf: there could be multiple Protobuf messages in a single message.
  • Avro
  • RawBLOB: the default value. Read/write message as plain text.
  • Template: create the payload with customized templates

format_template_resultset_format

When you set date_format to Template, you need to specify both format_template_resultset_format and format_template_row_format, and optionally format_template_rows_between_delimiter.

format_template_resultset_format controls how all the rows will be combined to create the POST payload, and contains the following placeholders:

  • data is the rows with data in format_template_row_format, separated by format_template_rows_between_delimiter.
  • totals is the row with total values.
  • min is the row with minimum values.
  • max is the row with maximum values.
  • rows_before_limit is the minimal number of rows there would have been without LIMIT.
  • time is the request execution time in seconds.
  • rows_read is the number of rows has been read.
  • bytes_read is the number of bytes (uncompressed) has been read.

For example:

  • To send markdown messages to Slack, the template is '{"blocks":[{"type":"section","text":{"type":"mrkdwn","text":"${data}"}}]}'
  • To send rows to BigQuery, the template is '{"rows":[${data}]}'

format_template_row_format

The setting specifies format strings for rows with the following syntax:

delimiter_1${column_1:serializeAs_1}delimiter_2${column_2:serializeAs_2} ... delimiter_N

For example:

  • To send markdown messages to Slack, the template is '${text:Raw}'
  • To send rows to BigQuery, the template is '{"json":{"num":${num:JSON},"str":${str:JSON}}}' (replace the column names per your table schema)

The following escaping rules are supported:

RulesDescription
CSV, JSON, XMLSimilar to the formats of the same names
EscapedSimilar to TSV
QuotedSimilar to Values
RawWithout escaping, similar to TSVRaw

format_template_rows_between_delimiter

The setting format_template_rows_between_delimiter setting specifies the delimiter between rows, which is printed (or expected) after every row except the last one (\n by default)

http_header_Authorization

You can set key/value pairs in HTTP header, using the format http_header_key=value. For example to set the Authorization header for Splunk HEC, you can use http_header_Authorization=Splunk hec_token.

output_format_json_array_of_rows

If you need the POST body to be a JSON array for multiple rows in a batch, set output_format_json_array_of_rows=1, e.g.

CREATE EXTERNAL STREAM http_openobserve_t1 (
level string,
job string,
log string
) SETTINGS
type = 'http',
data_format = 'JSONEachRow',
output_format_json_array_of_rows = 1,
username='..',
password='..',
url = 'https://api.openobserve.ai/api/../default/_json'