Vector Search
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Vector Search #

Batch Streaming

Flink SQL provides the VECTOR_SEARCH table-valued function (TVF) to perform a vector search in SQL queries. This function allows you to search similar rows according to the high-dimension vectors.

VECTOR_SEARCH Function #

The VECTOR_SEARCH uses a processing-time attribute to correlate rows to the latest version of data in an external table. It’s very similar to a lookup join in Flink SQL, however, the difference is VECTOR_SEARCH uses the input data vector to compare the similarity with data in the external table and return the top-k most similar rows.

Syntax #

SELECT * 
FROM input_table, LATERAL TABLE(VECTOR_SEARCH(
   TABLE vector_table, 
   input_table.vector_column, 
   DESCRIPTOR(index_column),
   top_k,
   [CONFIG => MAP['key', 'value']]
   ))

Parameters #

  • input_table: The input table containing the data to be processed
  • vector_table: The name of external table that allows searching via vector
  • vector_column: The name of the column in the input table, its type should be FLOAT ARRAY or DOUBLE ARRAY
  • index_column: A descriptor specifying which column from the vector table should be used to compare the similarity with the input data
  • top_k: The number of top-k most similar rows to return
  • config: (Optional) A map of configuration options for the vector search

Configuration Options #

The following configuration options can be specified in the config map:

Key Default Type Description
async
(none) Boolean Value can be 'true' or 'false' to suggest the planner choose the corresponding predict function. If the backend search function provider does not support the suggested mode, it will throw exception to notify users.
max-concurrent-operations
(none) Integer The max number of async i/o operation that the async vector search call can trigger.
output-mode
(none)

Enum

Output mode for asynchronous vector search call operations which will convert to {@see AsyncDataStream.OutputMode}, ORDERED by default. If set to ALLOW_UNORDERED, will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not affect the correctness of the result, otherwise ORDERED will be still used.

Possible values:
  • "ORDERED"
  • "ALLOW_UNORDERED"
timeout
(none) Duration Timeout from first invoke to final completion of asynchronous vector search call operation, may include multiple retries, and will be reset in case of failover.

Example #

-- Basic usage
SELECT * FROM 
input_table, LATERAL TABLE(VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10
));

-- With configuration options
SELECT * FROM 
input_table, LATERAL TABLE(VECTOR_SEARCH(
  TABLE vector_table,
  input_table.vector_column,
  DESCRIPTOR(index_column),
  10,
  MAP['async', 'true', 'timeout', '100s']
));

-- Using named parameters
SELECT * FROM 
input_table, LATERAL TABLE(VECTOR_SEARCH(
  SEARCH_TABLE => TABLE vector_table,
  COLUMN_TO_QUERY => input_table.vector_column,
  COLUMN_TO_SEARCH => DESCRIPTOR(index_column),
  TOP_K => 10,
  CONFIG => MAP['async', 'true', 'timeout', '100s']
));

-- Searching with contant value
SELECT * 
FROM TABLE(VECTOR_SEARCH(
  TABLE vector_table,
  ARRAY[10, 20],
  DESCRIPTOR(index_column),
  10,
));

Output #

The output table contains all columns from the input table, the vector search table columns and a column named score to indicate the similarity between the input row and matched row.

Notes #

  1. The implementation of the vector table must implement interface org.apache.flink.table.connector.source.VectorSearchTableSource. Please refer to Vector Search Table Source for details.
  2. VECTOR_SEARCH only supports to consume append-only tables.
  3. VECTOR_SEARCH does not require the LATERAL keyword when the function call has no correlation with other tables. For example, if the search column is a constant or literal value, LATERAL can be omitted.

Back to top