Важно: Данный раздел актуален для Платформы данных в Публичном облаке и On-Premise.
Движок таблицы (тип таблицы) определяет:
Наиболее универсальные и функциональные движки таблиц для задач с высокой загрузкой. Общим свойством этих движков является быстрая вставка данных с последующей фоновой обработкой данных. Движки *MergeTree
поддерживают репликацию данных (в Replicated* версиях движков), партиционирование, и другие возможности не поддержанные для других движков.
Движки семейства:
Движок MergeTree
, а также другие движки этого семейства (*MergeTree
) — это наиболее функциональные движки таблиц RT.WideStore.
Основная идея, заложенная в основу движков семейства MergeTree
следующая. Когда у вас есть огромное количество данных, которые должны быть вставлены в таблицу, вы должны быстро записать их по частям, а затем объединить части по некоторым правилам в фоновом режиме. Этот метод намного эффективнее, чем постоянная перезапись данных в хранилище при вставке.
Основные возможности:
Хранит данные, отсортированные по первичному ключу. Это позволяет создавать разреженный индекс небольшого объёма, который позволяет быстрее находить данные.
Позволяет оперировать партициями, если задан ключ партиционирования. RT.WideStore поддерживает отдельные операции с партициями, которые работают эффективнее, чем общие операции с этим же результатом над этими же данными. Также, RT.WideStore автоматически отсекает данные по партициям там, где ключ партиционирования указан в запросе. Это также увеличивает эффективность выполнения запросов.
Поддерживает репликацию данных. Для этого используется семейство таблиц ReplicatedMergeTree
. Подробнее читайте в разделе Репликация данных.
Поддерживает сэмплирование данных. При необходимости можно задать способ сэмплирования данных в таблице.
Примечание:
Движок Merge не относится к семейству
*MergeTree
.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
INDEX index_name1 expr1 TYPE type1(...) GRANULARITY value1,
INDEX index_name2 expr2 TYPE type2(...) GRANULARITY value2
) ENGINE = MergeTree()
ORDER BY expr
[PARTITION BY expr]
[PRIMARY KEY expr]
[SAMPLE BY expr]
[TTL expr
[DELETE|TO DISK 'xxx'|TO VOLUME 'xxx' [, ...] ]
[WHERE conditions]
[GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ] ]
[SETTINGS name=value, ...]
Описание параметров смотрите в описании запроса CREATE.
ENGINE
— имя и параметры движка. ENGINE = MergeTree()
. MergeTree
не имеет параметров.
ORDER BY
— ключ сортировки.
Кортеж столбцов или произвольных выражений. Пример: ORDER BY (CounterID, EventDate)
.
RT.WideStore использует ключ сортировки в качестве первичного ключа, если первичный ключ не задан в секции PRIMARY KEY
.
Чтобы отключить сортировку, используйте синтаксис ORDER BY tuple()
. Смотрите выбор первичного ключа.
PARTITION BY
— ключ партиционирования. Необязательный параметр.
Для партиционирования по месяцам используйте выражение toYYYYMM(date_column)
, где date_column
— столбец с датой типа Date. В этом случае имена партиций имеют формат "YYYYMM"
.
PRIMARY KEY
— первичный ключ, если он отличается от ключа сортировки. Необязательный параметр.
По умолчанию первичный ключ совпадает с ключом сортировки (который задаётся секцией ORDER BY
.) Поэтому в большинстве случаев секцию PRIMARY KEY
отдельно указывать не нужно.
SAMPLE BY
— выражение для сэмплирования. Необязательный параметр.
Если используется выражение для сэмплирования, то первичный ключ должен содержать его. Результат выражения для сэмплирования должен быть беззнаковым целым числом. Пример: SAMPLE BY intHash32(UserID) ORDER BY (CounterID, EventDate, intHash32(UserID))
.
TTL
— список правил, определяющих длительности хранения строк, а также задающих правила перемещения частей на определённые тома или диски. Необязательный параметр.
Выражение должно возвращать столбец Date
или DateTime
. Пример: TTL date + INTERVAL 1 DAY
.
Тип правила DELETE|TO DISK 'xxx'|TO VOLUME 'xxx'|GROUP BY
указывает действие, которое будет выполнено с частью: удаление строк (прореживание), перемещение (при выполнении условия для всех строк части) на определённый диск (TO DISK 'xxx'
) или том (TO VOLUME 'xxx'
), или агрегирование данных в устаревших строках. Поведение по умолчанию соответствует удалению строк (DELETE
). В списке правил может быть указано только одно выражение с поведением DELETE
.
Дополнительные сведения смотрите в разделе TTL для столбцов и таблиц
SETTINGS
— дополнительные параметры, регулирующие поведение MergeTree
(необязательные):
index_granularity
— максимальное количество строк данных между засечками индекса. По умолчанию — 8192. Смотрите Хранение данных.index_granularity_bytes
— максимальный размер гранул данных в байтах. По умолчанию — 10Mb. Чтобы ограничить размер гранул только количеством строк, установите значение 0 (не рекомендовано). Смотрите Хранение данных.min_index_granularity_bytes
— минимально допустимый размер гранул данных в байтах. Значение по умолчанию — 1024b. Для обеспечения защиты от случайного создания таблиц с очень низким значением index_granularity_bytes
. Смотрите Хранение данных.enable_mixed_granularity_parts
— включает или выключает переход к ограничению размера гранул с помощью настройки index_granularity_bytes
. Настройка index_granularity_bytes
улучшает производительность RT.WideStore при выборке данных из таблиц с большими (десятки и сотни мегабайтов) строками. Если у вас есть таблицы с большими строками, можно включить эту настройку, чтобы повысить эффективность запросов SELECT
.use_minimalistic_part_header_in_zookeeper
— Способ хранения заголовков кусков данных в ZooKeeper. Если use_minimalistic_part_header_in_zookeeper = 1
, то ZooKeeper хранит меньше данных. Подробнее читайте в описании настройки в разделе "Конфигурационные параметры сервера".min_merge_bytes_to_use_direct_io
— минимальный объём данных при слиянии, необходимый для прямого (небуферизованного) чтения/записи (direct I/O) на диск. При слиянии частей данных RT.WideStore вычисляет общий объём хранения всех данных, подлежащих слиянию. Если общий объём хранения всех данных для чтения превышает min_bytes_to_use_direct_io
байт, тогда RT.WideStore использует флаг O_DIRECT
при чтении данных с диска. Если min_merge_bytes_to_use_direct_io = 0
, тогда прямой ввод-вывод отключен. Значение по умолчанию: 10 * 1024 * 1024 * 1024
байтов.merge_with_ttl_timeout
— минимальное время в секундах перед повторным слиянием для удаления данных с истекшим TTL. По умолчанию: 14400
секунд (4 часа).merge_with_recompression_ttl_timeout
— минимальное время в секундах перед повторным слиянием для повторного сжатия данных с истекшим TTL. По умолчанию: 14400
секунд (4 часа).try_fetch_recompressed_part_timeout
— время ожидания (в секундах) перед началом слияния с повторным сжатием. В течение этого времени RT.WideStore пытается извлечь сжатую часть из реплики, которая назначила это слияние. Значение по умолчанию: 7200
секунд (2 часа).write_final_mark
— включает или отключает запись последней засечки индекса в конце куска данных, указывающей за последний байт. По умолчанию — 1. Не отключайте её.merge_max_block_size
— максимальное количество строк в блоке для операций слияния. Значение по умолчанию: 8192.storage_policy
— политика хранения данных. Смотрите Хранение данных таблицы на нескольких блочных устройствах.min_bytes_for_wide_part
, min_rows_for_wide_part
— минимальное количество байт/строк в куске данных для хранения в формате Wide
. Можно задать одну или обе настройки или не задавать ни одной. Подробнее см. в разделе Хранение данных.max_parts_in_total
— максимальное количество кусков во всех партициях.max_compress_block_size
— максимальный размер блоков несжатых данных перед сжатием для записи в таблицу. Вы также можете задать этот параметр в глобальных настройках (смотрите max_compress_block_size). Настройка, которая задается при создании таблицы, имеет более высокий приоритет, чем глобальная.min_compress_block_size
— минимальный размер блоков несжатых данных, необходимых для сжатия при записи следующей засечки. Вы также можете задать этот параметр в глобальных настройках (смотрите min_compress_block_size). Настройка, которая задается при создании таблицы, имеет более высокий приоритет, чем глобальная.max_partitions_to_read
— Ограничивает максимальное число партиций для чтения в одном запросе. Также возможно указать настройку max_partitions_to_read в глобальных настройках.Пример задания секций:
ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID) SETTINGS index_granularity=8192
В примере мы устанавливаем партиционирование по месяцам.
Также мы задаем выражение для сэмплирования в виде хэша по идентификатору посетителя. Это позволяет псевдослучайным образом перемешать данные в таблице для каждого CounterID
и EventDate
. Если при выборке данных задать секцию SAMPLE, то RT.WideStore вернёт равномерно-псевдослучайную выборку данных для подмножества посетителей.
index_granularity
можно было не указывать, поскольку 8192 — это значение по умолчанию.
Устаревший способ создания таблицы
Таблица состоит из кусков данных (data parts), отсортированных по первичному ключу.
При вставке в таблицу создаются отдельные куски данных, каждый из которых лексикографически отсортирован по первичному ключу. Например, если первичный ключ — (CounterID, Date)
, то данные в куске будут лежать в порядке CounterID
, а для каждого CounterID
в порядке Date
.
Данные, относящиеся к разным партициям, разбиваются на разные куски. В фоновом режиме RT.WideStore выполняет слияния (merge) кусков данных для более эффективного хранения. Куски, относящиеся к разным партициям не объединяются. Механизм слияния не гарантирует, что все строки с одинаковым первичным ключом окажутся в одном куске.
Куски данных могут храниться в формате Wide
или Compact
. В формате Wide
каждый столбец хранится в отдельном файле, а в формате Compact
все столбцы хранятся в одном файле. Формат Compact
может быть полезен для повышения производительности при частом добавлении небольших объемов данных.
Формат хранения определяется настройками движка min_bytes_for_wide_part
и min_rows_for_wide_part
. Если число байт или строк в куске данных меньше значения, указанного в соответствующей настройке, тогда этот кусок данных хранится в формате Compact
. В противном случае кусок данных хранится в формате Wide
. Если ни одна из настроек не задана, куски данных хранятся в формате Wide
.
Каждый кусок данных логически делится на гранулы. Гранула — это минимальный неделимый набор данных, который RT.WideStore считывает при выборке данных. не разбивает строки и значения и гранула всегда содержит целое число строк. Первая строка гранулы помечается значением первичного ключа для этой строки (засечка). Для каждого куска данных RT.WideStore создаёт файл с засечками (индексный файл). Для каждого столбца, независимо от того, входит он в первичный ключ или нет, RT.WideStore также сохраняет эти же засечки. Засечки используются для поиска данных напрямую в файлах столбцов.
Размер гранул ограничен настройками движка index_granularity
и index_granularity_bytes
. Количество строк в грануле лежит в диапазоне [1, index_granularity]
, в зависимости от размера строк. Размер гранулы может превышать index_granularity_bytes
в том случае, когда размер единственной строки в грануле превышает значение настройки. В этом случае, размер гранулы равен размеру строки.
Рассмотрим первичный ключ — (CounterID, Date)
. В этом случае сортировку и индекс можно проиллюстрировать следующим образом:
Whole data: [-------------------------------------------------------------------------]
CounterID: [aaaaaaaaaaaaaaaaaabbbbcdeeeeeeeeeeeeefgggggggghhhhhhhhhiiiiiiiiikllllllll]
Date: [1111111222222233331233211111222222333211111112122222223111112223311122333]
Marks: | | | | | | | | | | |
a,1 a,2 a,3 b,3 e,2 e,3 g,1 h,2 i,1 i,3 l,3
Marks numbers: 0 1 2 3 4 5 6 7 8 9 10
Если в запросе к данным указать:
CounterID IN ('a', 'h')
, то сервер читает данные в диапазонах засечек [0, 3)
и [6, 8)
.CounterID IN ('a', 'h') AND Date = 3
, то сервер читает данные в диапазонах засечек [1, 3)
и [7, 8)
.Date = 3
, то сервер читает данные в диапазоне засечек [1, 10]
.Примеры выше показывают, что использование индекса всегда эффективнее, чем full scan.
Разреженный индекс допускает чтение лишних строк. При чтении одного диапазона первичного ключа, может быть прочитано до index_granularity * 2
лишних строк в каждом блоке данных.
Разреженный индекс почти всегда помещается в оперативную память и позволяет работать с очень большим количеством строк в таблицах.
RT.WideStore не требует уникального первичного ключа. Можно вставить много строк с одинаковым первичным ключом.
Ключ в PRIMARY KEY
и ORDER BY
может иметь тип Nullable
. За поддержку этой возможности отвечает настройка allow_nullable_key.
При сортировке с использованием выражения ORDER BY
для значений NULL
всегда работает принцип NULLS_LAST.
Количество столбцов в первичном ключе не ограничено явным образом. В зависимости от структуры данных в первичный ключ можно включать больше или меньше столбцов. Это может:
Увеличить эффективность индекса.
Пусть первичный ключ — (a, b)
, тогда добавление ещё одного столбца c
повысит эффективность, если выполнены условия:
c
.index_granularity
) диапазоны данных с одинаковыми значениями (a, b)
. Иначе говоря, когда добавление ещё одного столбца позволит пропускать достаточно длинные диапазоны данных.Улучшить сжатие данных.
RT.WideStore сортирует данные по первичному ключу, поэтому чем выше однородность, тем лучше сжатие.
Обеспечить дополнительную логику при слиянии кусков данных в движках CollapsingMergeTree и SummingMergeTree.
В этом случае имеет смысл указать отдельный ключ сортировки, отличающийся от первичного ключа.
Длинный первичный ключ будет негативно влиять на производительность вставки и потребление памяти, однако на производительность RT.WideStore при запросах SELECT
лишние столбцы в первичном ключе не влияют.
Вы можете создать таблицу без первичного ключа, используя синтаксис ORDER BY tuple()
. В этом случае RT.WideStore хранит данные в порядке вставки. Если вы хотите сохранить порядок данных при вставке данных с помощью запросов INSERT ... SELECT
, установите max_insert_threads = 1.
Чтобы выбрать данные в первоначальном порядке, используйте однопоточные запросы `SELECT.
Существует возможность задать первичный ключ (выражение, значения которого будут записаны в индексный файл для каждой засечки), отличный от ключа сортировки (выражение, по которому будут упорядочены строки в кусках данных). Кортеж выражения первичного ключа при этом должен быть префиксом кортежа выражения ключа сортировки.
Данная возможность особенно полезна при использовании движков SummingMergeTree и AggregatingMergeTree. В типичном сценарии использования этих движков таблица содержит столбцы двух типов: измерения (dimensions) и меры (measures). Типичные запросы агрегируют значения столбцов-мер с произвольной группировкой и фильтрацией по измерениям. Так как SummingMergeTree
и AggregatingMergeTree
производят фоновую агрегацию строк с одинаковым значением ключа сортировки, приходится добавлять в него все столбцы-измерения. В результате выражение ключа содержит большой список столбцов, который приходится постоянно расширять при добавлении новых измерений.
В этом сценарии имеет смысл оставить в первичном ключе всего несколько столбцов, которые обеспечат эффективную фильтрацию по индексу, а остальные столбцы-измерения добавить в выражение ключа сортировки.
ALTER ключа сортировки — лёгкая операция, так как при одновременном добавлении нового столбца в таблицу и ключ сортировки не нужно изменять данные кусков (они остаются упорядоченными и по новому выражению ключа).
Для запросов SELECT
RT.WideStore анализирует возможность использования индекса. Индекс может использоваться, если в секции WHERE/PREWHERE
, в качестве одного из элементов конъюнкции, или целиком, есть выражение, представляющее операции сравнения на равенства, неравенства, а также IN
или LIKE
с фиксированным префиксом, над столбцами или выражениями, входящими в первичный ключ или ключ партиционирования, либо над некоторыми частично монотонными функциями от этих столбцов, а также логические связки над такими выражениями.
Таким образом, обеспечивается возможность быстро выполнять запросы по одному или многим диапазонам первичного ключа. Например, в указанном примере будут быстро работать запросы для конкретного счётчика; для конкретного счётчика и диапазона дат; для конкретного счётчика и даты, для нескольких счётчиков и диапазона дат и т. п.
Рассмотрим движок сконфигурированный следующим образом:
ENGINE MergeTree() PARTITION BY toYYYYMM(EventDate) ORDER BY (CounterID, EventDate) SETTINGS index_granularity=8192
В этом случае в запросах:
SELECT count() FROM table WHERE EventDate = toDate(now()) AND CounterID = 34
SELECT count() FROM table WHERE EventDate = toDate(now()) AND (CounterID = 34 OR CounterID = 42)
SELECT count() FROM table WHERE ((EventDate >= toDate('2014-01-01') AND EventDate <= toDate('2014-01-31')) OR EventDate = toDate('2014-05-01')) AND CounterID IN (101500, 731962, 160656) AND (CounterID = 101500 OR EventDate != toDate('2014-05-01'))
RT.WideStore будет использовать индекс по первичному ключу для отсечения не подходящих данных, а также ключ партиционирования по месяцам для отсечения партиций, которые находятся в не подходящих диапазонах дат.
Запросы выше показывают, что индекс используется даже для сложных выражений. Чтение из таблицы организовано так, что использование индекса не может быть медленнее, чем full scan.
В примере ниже индекс не может использоваться.
SELECT count() FROM table WHERE CounterID = 34 OR URL LIKE '%upyachka%'
Чтобы проверить, сможет ли RT.WideStore использовать индекс при выполнении запроса, используйте настройки force_index_by_date и force_primary_key.
Ключ партиционирования по месяцам обеспечивает чтение только тех блоков данных, которые содержат даты из нужного диапазона. При этом блок данных может содержать данные за многие даты (до целого месяца). В пределах одного блока данные упорядочены по первичному ключу, который может не содержать дату в качестве первого столбца. В связи с этим, при использовании запроса с указанием условия только на дату, но не на префикс первичного ключа, будет читаться данных больше, чем за одну дату.
Рассмотрим, например, дни месяца. Они образуют последовательность монотонную в течение одного месяца, но не монотонную на более длительных периодах. Это частично-монотонная последовательность. Если пользователь создаёт таблицу с частично-монотонным первичным ключом, RT.WideStore как обычно создаёт разреженный индекс. Когда пользователь выбирает данные из такого рода таблиц, RT.WideStore анализирует условия запроса. Если пользователь хочет получить данные между двумя метками индекса, и обе эти метки находятся внутри одного месяца, RT.WideStore может использовать индекс в данном конкретном случае, поскольку он может рассчитать расстояние между параметрами запроса и индексными метками.
RT.WideStore не может использовать индекс, если значения первичного ключа в диапазоне параметров запроса не представляют собой монотонную последовательность. В этом случае RT.WideStore использует метод полного сканирования.
RT.WideStore использует эту логику не только для последовательностей дней месяца, но и для любого частично-монотонного первичного ключа.
Объявление индексов при определении столбцов в запросе CREATE
.
INDEX index_name expr TYPE type(...) GRANULARITY granularity_value
Для таблиц семейства *MergeTree
можно задать дополнительные индексы в секции столбцов.
Индексы агрегируют для заданного выражения некоторые данные, а потом при SELECT
запросе используют для пропуска блоков данных (пропускаемый блок состоит из гранул данных в количестве равном гранулярности данного индекса), на которых секция WHERE
не может быть выполнена, тем самым уменьшая объём данных читаемых с диска.
Пример:
CREATE TABLE table_name
(
u64 UInt64,
i32 Int32,
s String,
...
INDEX a (u64 * i32, s) TYPE minmax GRANULARITY 3,
INDEX b (u64 * length(s)) TYPE set(1000) GRANULARITY 4
) ENGINE = MergeTree()
...
Эти индексы смогут использоваться для оптимизации следующих запросов
SELECT count() FROM table WHERE s < 'z'
SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
minmax
— хранит минимум и максимум выражения (если выражение - Tuple, то для каждого элемента Tuple
), используя их для пропуска блоков аналогично первичному ключу.
set(max_rows)
— хранит уникальные значения выражения на блоке в количестве не более max_rows
(если max_rows = 0
, то ограничений нет), используя их для пропуска блоков, оценивая выполнимость WHERE
выражения на хранимых данных.
ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)
— хранит фильтр Блума, содержащий все N-граммы блока данных. Работает только с данными форматов String, FixedString и Map с ключами типа String
или fixedString
. Может быть использован для оптимизации выражений EQUALS
, LIKE
и IN
.
n
— размер N-граммы,size_of_bloom_filter_in_bytes
— размер в байтах фильтра Блума (можно использовать большие значения, например, 256 или 512, поскольку сжатие компенсирует возможные издержки).number_of_hash_functions
— количество хеш-функций, использующихся в фильтре Блума.random_seed
— состояние генератора случайных чисел для хеш-функций фильтра Блума.tokenbf_v1(size_of_bloom_filter_in_bytes, number_of_hash_functions, random_seed)
— то же, что иngrambf_v1
, но хранит токены вместо N-грамм. Токены — это последовательности символов, разделенные не буквенно-цифровыми символами.
bloom_filter([false_positive])
— фильтр Блума для указанных стоблцов.
Необязательный параметр false_positive
— это вероятность получения ложноположительного срабатывания. Возможные значения: (0, 1). Значение по умолчанию: 0.025.
Поддерживаемые типы данных: Int*
, UInt*
, Float*
, Enum
, Date
, DateTime
, String
, FixedString
.
Фильтром могут пользоваться функции: equals, notEquals, in, notIn, has, hasAny, hasAll.
Примеры:
INDEX b (u64 * length(str), i32 + f64 * 100, date, str) TYPE minmax GRANULARITY 4
INDEX b (u64 * length(str), i32 + f64 * 100, date, str) TYPE set(100) GRANULARITY 4
Условия в секции WHERE
содержат вызовы функций, оперирующих со столбцами. Если столбец - часть индекса, RT.WideStore пытается использовать индекс при выполнении функции. Для разных видов индексов, RT.WideStore поддерживает различные наборы функций, которые могут использоваться индексами.
Индекс set
используется со всеми функциями. Наборы функций для остальных индексов представлены в таблице ниже.
Функция (оператор) / Индекс |
primary key |
minmax |
ngrambf_v1 |
tokenbf_v1 |
bloom_filter |
---|---|---|---|---|---|
equals (=, ==) | ✔ | ✔ | ✔ | ✔ | ✔ |
notEquals(!=, <>) | ✔ | ✔ | ✔ | ✔ | ✔ |
like | ✔ | ✔ | ✔ | ✔ | ✗ |
notLike | ✔ | ✔ | ✔ | ✔ | ✗ |
startsWith | ✔ | ✔ | ✔ | ✔ | ✗ |
endsWith | ✗ | ✗ | ✔ | ✔ | ✗ |
multiSearchAny | ✗ | ✗ | ✔ | ✗ | ✗ |
in | ✔ | ✔ | ✔ | ✔ | ✔ |
notIn | ✔ | ✔ | ✔ | ✔ | ✔ |
less (\<) | ✔ | ✔ | ✗ | ✗ | ✗ |
greater (>) | ✔ | ✔ | ✗ | ✗ | ✗ |
lessOrEquals (\<=) | ✔ | ✔ | ✗ | ✗ | ✗ |
greaterOrEquals (>=) | ✔ | ✔ | ✗ | ✗ | ✗ |
empty | ✔ | ✔ | ✗ | ✗ | ✗ |
notEmpty | ✔ | ✔ | ✗ | ✗ | ✗ |
hasToken | ✗ | ✗ | ✗ | ✔ | ✗ |
Функции с постоянным агрументом, который меньше, чем размер ngram не могут использовать индекс ngrambf_v1
для оптимизации запроса.
Фильтры Блума могут иметь ложнопозитивные срабатывания, следовательно индексы ngrambf_v1
, tokenbf_v1
и bloom_filter
невозможно использовать для оптимизации запросов, в которых результат функции предполается false, например:
s LIKE '%test%'
NOT s NOT LIKE '%test%'
s = 1
NOT s != 1
startsWith(s, 'test')
NOT s LIKE '%test%'
s NOT LIKE '%test%'
NOT s = 1
s != 1
NOT startsWith(s, 'test')
Проекции похожи на материализованные представления, но определяются на уровне кусков данных. Это обеспечивает гарантии согласованности данных наряду с автоматическим использованием в запросах.
Проекции — это экспериментальная возможность. Чтобы включить поддержку проекций, установите настройку allow_experimental_projection_optimization в значение 1
. См. также настройку force_optimize_projection .
Проекции не поддерживаются для запросов SELECT
с модификатором FINAL.
Запрос проекции — это то, что определяет проекцию. Такой запрос неявно выбирает данные из родительской таблицы.
Синтаксис:
SELECT <column list expr> [GROUP BY] <group keys expr> [ORDER BY] <expr>
Проекции можно изменить или удалить с помощью запроса ALTER.
Проекции хранятся в каталоге куска данных. Это похоже на хранение индексов, но используется подкаталог, в котором хранится анонимный кусок таблицы MergeTree
. Таблица создается запросом определения проекции. Если присутствует секция GROUP BY
, то используется движок AggregatingMergeTree, а все агрегатные функции преобразуются в AggregateFunction
. Если присутствует секция ORDER BY
, таблица MergeTree
использует ее в качестве выражения для первичного ключа. Во время процесса слияния кусок данных проекции объединяется с помощью процедуры слияния хранилища. Контрольная сумма куска данных родительской таблицы включает кусок данных проекции. Другие процедуры аналогичны индексам пропуска данных.
Для конкурентного доступа к таблице используется мультиверсионность. То есть, при одновременном чтении и обновлении таблицы, данные будут читаться из набора кусочков, актуального на момент запроса. Длинных блокировок нет. Вставки никак не мешают чтениям.
Чтения из таблицы автоматически распараллеливаются.
Определяет время жизни значений.
Секция TTL
может быть установлена как для всей таблицы, так и для каждого отдельного столбца. Для таблиц можно установить правила TTL
для фонового перемещения кусков данных на целевые диски или тома, или правила повторного сжатия кусков данных.
Выражения должны возвращать тип Date или DateTime.
Синтаксис:
Для задания времени жизни столбца:
TTL time_column
TTL time_column + interval
Чтобы задать interval
, используйте операторы интервала времени, например:
TTL date_time + INTERVAL 1 MONTH
TTL date_time + INTERVAL 15 HOUR
Когда срок действия значений в столбце истечёт, RT.WideStore заменит их значениями по умолчанию для типа данных столбца. Если срок действия всех значений столбцов в части данных истек, RT.WideStore удаляет столбец из куска данных в файловой системе.
Секцию TTL
нельзя использовать для ключевых столбцов.
Примеры:
Создание таблицы с TTL
:
CREATE TABLE example_table
(
d DateTime,
a Int TTL d + INTERVAL 1 MONTH,
b Int TTL d + INTERVAL 1 MONTH,
c String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(d)
ORDER BY d;
Добавление TTL
на колонку существующей таблицы:
ALTER TABLE example_table
MODIFY COLUMN
c String TTL d + INTERVAL 1 DAY;
Изменение TTL
у колонки:
ALTER TABLE example_table
MODIFY COLUMN
c String TTL d + INTERVAL 1 MONTH;
Для таблицы можно задать одно выражение для устаревания данных, а также несколько выражений, при срабатывании которых данные будут перемещены на некоторый диск или том. Когда некоторые данные в таблице устаревают, RT.WideStore удаляет все соответствующие строки. Операции перемещения или повторного сжатия данных выполняются только когда устаревают все данные в куске.
TTL expr
[DELETE|RECOMPRESS codec_name1|TO DISK 'xxx'|TO VOLUME 'xxx'][, DELETE|RECOMPRESS codec_name2|TO DISK 'aaa'|TO VOLUME 'bbb'] ...
[WHERE conditions]
[GROUP BY key_expr [SET v1 = aggr_func(v1) [, v2 = aggr_func(v2) ...]] ]
За каждым TTL
выражением может следовать тип действия, которое выполняется после достижения времени, соответствующего результату TTL
выражения:
DELETE
- удалить данные (действие по умолчанию);RECOMPRESS codec_name
- повторно сжать данные с помощью кодека codec_name
;TO DISK 'aaa'
- переместить данные на диск aaa
;TO VOLUME 'bbb'
- переместить данные на том bbb
;GROUP BY
- агрегировать данные.В секции WHERE
можно задать условие удаления или агрегирования устаревших строк (для перемещения и сжатия условие WHERE
не применимо).
Колонки, по которым агрегируются данные в GROUP BY
, должны являться префиксом первичного ключа таблицы.
Если колонка не является частью выражения GROUP BY
и не задается напрямую в секции SET
, в результирующих строках она будет содержать случайное значение, взятое из одной из сгруппированных строк (как будто к ней применяется агрегирующая функция any
).
Примеры:
Создание таблицы с TTL
:
CREATE TABLE example_table
(
d DateTime,
a Int
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(d)
ORDER BY d
TTL d + INTERVAL 1 MONTH DELETE,
d + INTERVAL 1 WEEK TO VOLUME 'aaa',
d + INTERVAL 2 WEEK TO DISK 'bbb';
Изменение TTL
:
ALTER TABLE example_table
MODIFY TTL d + INTERVAL 1 DAY;
Создание таблицы, в которой строки устаревают через месяц. Устаревшие строки удаляются, если дата выпадает на понедельник:
CREATE TABLE table_with_where
(
d DateTime,
a Int
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(d)
ORDER BY d
TTL d + INTERVAL 1 MONTH DELETE WHERE toDayOfWeek(d) = 1;
Создание таблицы, в которой куски с устаревшими данными повторно сжимаются:
CREATE TABLE table_for_recompression
(
d DateTime,
key UInt64,
value String
)
ENGINE MergeTree()
ORDER BY tuple()
PARTITION BY key
TTL d + INTERVAL 1 MONTH RECOMPRESS CODEC(ZSTD(17)), d + INTERVAL 1 YEAR RECOMPRESS CODEC(LZ4HC(10))
SETTINGS min_rows_for_wide_part = 0, min_bytes_for_wide_part = 0;
Создание таблицы, где устаревшие строки агрегируются. В результирующих строках колонка x
содержит максимальное значение по сгруппированным строкам, y
— минимальное значение, а d
— случайное значение из одной из сгуппированных строк.
CREATE TABLE table_for_aggregation
(
d DateTime,
k1 Int,
k2 Int,
x Int,
y Int
)
ENGINE = MergeTree
ORDER BY (k1, k2)
TTL d + INTERVAL 1 MONTH GROUP BY k1, k2 SET x = max(x), y = min(y);
Данные с истекшим TTL
удаляются, когда RT.WideStore мёржит куски данных.
Когда RT.WideStore видит, что некоторые данные устарели, он выполняет внеплановые мёржи. Для управления частотой подобных мёржей, можно задать настройку merge_with_ttl_timeout
. Если её значение слишком низкое, придется выполнять много внеплановых мёржей, которые могут начать потреблять значительную долю ресурсов сервера.
Если вы выполните запрос SELECT
между слияниями вы можете получить устаревшие данные. Чтобы избежать этого используйте запрос OPTIMIZE перед SELECT
.
См. также:
Движки таблиц семейства MergeTree
могут хранить данные на нескольких блочных устройствах. Это может оказаться полезным, например, при неявном разделении данных одной таблицы на «горячие» и «холодные». Наиболее свежая часть занимает малый объём и запрашивается регулярно, а большой хвост исторических данных запрашивается редко. При наличии в системе нескольких дисков, «горячая» часть данных может быть размещена на быстрых дисках (например, на NVMe SSD или в памяти), а холодная на более медленных (например, HDD).
Минимальной перемещаемой единицей для MergeTree
является кусок данных (data part). Данные одного куска могут находится только на одном диске. Куски могут перемещаться между дисками в фоне, согласно пользовательским настройкам, а также с помощью запросов ALTER.
У всех описанных сущностей при создании указываются имена, можно найти в системных таблицах system.storage_policies и system.disks. Имя политики хранения можно указать в настройке storage_policy
движков таблиц семейства MergeTree
.
Диски, тома и политики хранения задаются внутри тега <storage_configuration>
в основном файле config.xml
или в отдельном файле в директории config.d
.
Структура конфигурации:
<storage_configuration>
<disks>
<disk_name_1> <!-- disk name -->
<path>/mnt/fast_ssd/clickhouse/</path>
</disk_name_1>
<disk_name_2>
<path>/mnt/hdd1/clickhouse/</path>
<keep_free_space_bytes>10485760</keep_free_space_bytes>
</disk_name_2>
<disk_name_3>
<path>/mnt/hdd2/clickhouse/</path>
<keep_free_space_bytes>10485760</keep_free_space_bytes>
</disk_name_3>
...
</disks>
...
</storage_configuration>
Теги:
<disk_name_N>
— имя диска. Имена должны быть разными для всех дисков.path
— путь по которому будут храниться данные сервера (каталоги data
и shadow
), должен быть терминирован /
.keep_free_space_bytes
— размер зарезервированного свободного места на диске.Порядок задания дисков не имеет значения.
Общий вид конфигурации политик хранения:
<storage_configuration>
...
<policies>
<policy_name_1>
<volumes>
<volume_name_1>
<disk>disk_name_from_disks_configuration</disk>
<max_data_part_size_bytes>1073741824</max_data_part_size_bytes>
</volume_name_1>
<volume_name_2>
<!-- configuration -->
</volume_name_2>
<!-- more volumes -->
</volumes>
<move_factor>0.2</move_factor>
</policy_name_1>
<policy_name_2>
<!-- configuration -->
</policy_name_2>
<!-- more policies -->
</policies>
...
</storage_configuration>
Тэги:
policy_name_N
— название политики. Названия политик должны быть уникальны.volume_name_N
— название тома. Названия томов должны быть уникальны.disk
— диск, находящийся внутри тома.max_data_part_size_bytes
— максимальный размер куска данных, который может находиться на любом из дисков этого тома. Если в результате слияния размер куска ожидается больше, чем max_data_part_size_bytes, то этот кусок будет записан в следующий том. В основном эта функция позволяет хранить новые / мелкие куски на горячем (SSD) томе и перемещать их на холодный (HDD) том, когда они достигают большого размера. Не используйте этот параметр, если политика имеет только один том.move_factor
— доля доступного свободного места на томе, если места становится меньше, то данные начнут перемещение на следующий том, если он есть (по умолчанию 0.1). Для перемещения куски сортируются по размеру от большего к меньшему (по убыванию) и выбираются куски, совокупный размер которых достаточен для соблюдения условия move_factor
, если совокупный размер всех партов недостаточен, будут перемещены все парты.prefer_not_to_merge
— Отключает слияние кусков данных, хранящихся на данном томе. Если данная настройка включена, то слияние данных, хранящихся на данном томе, не допускается. Это позволяет контролировать работу RT.WideStore с медленными дисками.Примеры конфигураций:
<storage_configuration>
...
<policies>
<hdd_in_order> <!-- policy name -->
<volumes>
<single> <!-- volume name -->
<disk>disk1</disk>
<disk>disk2</disk>
</single>
</volumes>
</hdd_in_order>
<moving_from_ssd_to_hdd>
<volumes>
<hot>
<disk>fast_ssd</disk>
<max_data_part_size_bytes>1073741824</max_data_part_size_bytes>
</hot>
<cold>
<disk>disk1</disk>
</cold>
</volumes>
<move_factor>0.2</move_factor>
</moving_from_ssd_to_hdd>
<small_jbod_with_external_no_merges>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
<prefer_not_to_merge>true</prefer_not_to_merge>
</external>
</volumes>
</small_jbod_with_external_no_merges>
</policies>
...
</storage_configuration>
В приведенном примере, политика hdd_in_order
реализует прицип round-robin. Так как в политике есть всего один том (single
), то все записи производятся на его диски по круговому циклу. Такая политика может быть полезна при наличии в системе нескольких похожих дисков, но при этом не сконфигурирован RAID. Учтите, что каждый отдельный диск ненадёжен и чтобы не потерять важные данные это необходимо скомпенсировать за счет хранения данных в трёх копиях.
Если система содержит диски различных типов, то может пригодиться политика moving_from_ssd_to_hdd
. В томе hot
находится один SSD-диск (fast_ssd
), а также задается ограничение на максимальный размер куска, который может храниться на этом томе (1GB). Все куски такой таблицы больше 1GB будут записываться сразу на том cold
, в котором содержится один HDD-диск disk1
. Также при заполнении диска fast_ssd
более чем на 80% данные будут переноситься на диск disk1
фоновым процессом.
Порядок томов в политиках хранения важен, при достижении условий на переполнение тома данные переносятся на следующий. Порядок дисков в томах так же важен, данные пишутся по очереди на каждый из них.
После задания конфигурации политик хранения их можно использовать, как настройку при создании таблиц:
CREATE TABLE table_with_non_default_policy (
EventDate Date,
OrderID UInt64,
BannerID UInt64,
SearchPhrase String
) ENGINE = MergeTree
ORDER BY (OrderID, BannerID)
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy = 'moving_from_ssd_to_hdd'
По умолчанию используется политика хранения default
в которой есть один том и один диск, указанный в <path>
. Изменить политику хранения после создания таблицы можно при помощи запроса [ALTER TABLE ... MODIFY SETTING]. При этом необходимо учесть, что новая политика должна содержать все тома и диски предыдущей политики с теми же именами.
Количество потоков для фоновых перемещений кусков между дисками можно изменить с помощью настройки background_move_pool_size
В таблицах MergeTree
данные попадают на диск несколькими способами:
INSERT
).Во всех случаях, кроме мутаций и заморозки партиций, при записи куска выбирается том и диск в соответствии с указанной конфигурацией хранилища:
unreserved_space > current_part_size
) и который позволяет записывать куски требуемого размера max_data_part_size_bytes > current_part_size
.unreserved_space - keep_free_space_bytes > current_part_size
)Мутации и запросы заморозки партиций в реализации используют жесткие ссылки. Жесткие ссылки между различными дисками не поддерживаются, поэтому в случае таких операций куски размещаются на тех же дисках, что и исходные.
В фоне куски перемещаются между томами на основе информации о занятом месте (настройка move_factor
) по порядку, в котором указаны тома в конфигурации. Данные никогда не перемещаются с последнего тома и на первый том. Следить за фоновыми перемещениями можно с помощью системных таблиц system.part_log (поле type = MOVE_PART
) и system.parts (поля path
и disk
). Также подробная информация о перемещениях доступна в логах сервера. С помощью запроса ALTER TABLE … MOVE PART|PARTITION … TO VOLUME|DISK … пользователь может принудительно перенести кусок или партицию с одного раздела на другой. При этом учитываются все ограничения, указанные для фоновых операций. Запрос самостоятельно инициирует процесс перемещения не дожидаясь фоновых операций. В случае недостатка места или неудовлетворения ограничениям пользователь получит сообщение об ошибке.
Перемещения данных не взаимодействуют с репликацией данных, поэтому на разных репликах одной и той же таблицы могут быть указаны разные политики хранения.
После выполнения фоновых слияний или мутаций старые куски не удаляются сразу, а через некоторое время (табличная настройка old_parts_lifetime
). Также они не перемещаются на другие тома или диски, поэтому до момента удаления они продолжают учитываться при подсчёте занятого дискового пространства.
Пользователь может сбалансированно распределять новые большие куски данных по разным дискам тома JBOD, используя настройку min_bytes_to_rebalance_partition_over_jbod.
Таблицы семейства MergeTree
могут хранить данные в сервисе S3 при использовании диска типа s3
.
Конфигурация:
<storage_configuration>
...
<disks>
<s3>
<type>s3</type>
<endpoint>https://storage.yandexcloud.net/my-bucket/root-path/</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
<region></region>
<proxy>
<uri>http://proxy1</uri>
<uri>http://proxy2</uri>
</proxy>
<connect_timeout_ms>10000</connect_timeout_ms>
<request_timeout_ms>5000</request_timeout_ms>
<retry_attempts>10</retry_attempts>
<single_read_retries>4</single_read_retries>
<min_bytes_for_seek>1000</min_bytes_for_seek>
<metadata_path>/var/lib/clickhouse/disks/s3/</metadata_path>
<cache_enabled>true</cache_enabled>
<cache_path>/var/lib/clickhouse/disks/s3/cache/</cache_path>
<skip_access_check>false</skip_access_check>
</s3>
</disks>
...
</storage_configuration>
Обязательные параметры:
endpoint
— URL точки приема запроса на стороне S3 в форматах path
или virtual hosted
. URL точки должен содержать бакет и путь к корневой директории на сервере, где хранятся данные.access_key_id
— id ключа доступа к S3.secret_access_key
— секретный ключ доступа к S3.Необязательные параметры:
region
— название региона S3.use_environment_credentials
— признак, нужно ли считывать учетные данные AWS из сетевого окружения, а также из переменных окружения AWS_ACCESS_KEY_ID
, AWS_SECRET_ACCESS_KEY
и AWS_SESSION_TOKEN
, если они есть. Значение по умолчанию: false
.use_insecure_imds_request
— признак, нужно ли использовать менее безопасное соединение при выполнении запроса к IMDS при получении учётных данных из метаданных Amazon EC2. Значение по умолчанию: false
.proxy
— конфигурация прокси-сервера для конечной точки S3. Каждый элемент uri
внутри блока proxy
должен содержать URL прокси-сервера.connect_timeout_ms
— таймаут подключения к сокету в миллисекундах. Значение по умолчанию: 10 секунд.request_timeout_ms
— таймаут выполнения запроса в миллисекундах. Значение по умолчанию: 5 секунд.retry_attempts
— число попыток выполнения запроса в случае возникновения ошибки. Значение по умолчанию: 10
.single_read_retries
— число попыток выполнения запроса в случае возникновения ошибки в процессе чтения. Значение по умолчанию: 4
.min_bytes_for_seek
— минимальное количество байтов, которые используются для операций поиска вместо последовательного чтения. Значение по умолчанию: 1 МБайт.metadata_path
— путь к локальному файловому хранилищу для хранения файлов с метаданными для S3. Значение по умолчанию: /var/lib/clickhouse/disks/<disk_name>/
.cache_enabled
— признак, разрешено ли хранение кэша засечек и файлов индекса в локальной файловой системе. Значение по умолчанию: true
.cache_path
— путь в локальной файловой системе, где будут храниться кэш засечек и файлы индекса. Значение по умолчанию: /var/lib/clickhouse/disks/<disk_name>/cache/
.skip_access_check
— признак, выполнять ли проверку доступов при запуске диска. Если установлено значение true
, то проверка не выполняется. Значение по умолчанию: false
.Диск S3 может быть сконфигурирован как main
или cold
:
<storage_configuration>
...
<disks>
<s3>
<type>s3</type>
<endpoint>https://storage.yandexcloud.net/my-bucket/root-path/</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
</s3>
</disks>
<policies>
<s3_main>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3_main>
<s3_cold>
<volumes>
<main>
<disk>default</disk>
</main>
<external>
<disk>s3</disk>
</external>
</volumes>
<move_factor>0.2</move_factor>
</s3_cold>
</policies>
...
</storage_configuration>
Если диск сконфигурирован как cold
, данные будут переноситься в S3 при срабатывании правил TTL или когда свободное место на локальном диске станет меньше порогового значения, которое определяется как move_factor * disk_size
.
_part
— Имя куска._part_index
— Номер куска по порядку в результате запроса._partition_id
— Имя партиции._part_uuid
— Уникальный идентификатор куска (если включена MergeTree настройка assign_part_uuids
)._partition_value
— Значения (кортеж) выражения partition by
._sample_factor
— Коэффициент сэмплирования (из запроса).Движок отличается от MergeTree тем, что выполняет удаление дублирующихся записей с одинаковым значением ключа сортировки (секция ORDER BY
, не PRIMARY KEY
).
Дедупликация данных производится лишь во время слияний. Слияние происходят в фоне в неизвестный момент времени, на который вы не можете ориентироваться. Некоторая часть данных может остаться необработанной. Хотя вы можете вызвать внеочередное слияние с помощью запроса OPTIMIZE
, на это не стоит рассчитывать, так как запрос OPTIMIZE
приводит к чтению и записи большого объёма данных.
Таким образом, ReplacingMergeTree
подходит для фоновой чистки дублирующихся данных в целях экономии места, но не даёт гарантии отсутствия дубликатов.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
)
ENGINE = ReplacingMergeTree([ver])
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
Описание параметров запроса смотрите в описании запроса.
"ВНИМАНИЕ"
Уникальность строк определяется
ORDER BY
секцией таблицы, а неPRIMARY KEY
.
ver
— столбец с номером версии. Тип UInt*
, Date
, DateTime
или DateTime64
. Необязательный параметр.
При слиянии ReplacingMergeTree
оставляет только строку для каждого уникального ключа сортировки:
- Последнюю в выборке, если `ver` не задан. Под выборкой здесь понимается набор строк в наборе кусков данных, участвующих в слиянии. Последний по времени создания кусок (последняя вставка) будет последним в выборке. Таким образом, после дедупликации для каждого значения ключа сортировки останется самая последняя строка из самой последней вставки.
- С максимальной версией, если `ver` задан. Если `ver` одинаковый у нескольких строк, то для них используется правило -- если `ver` не задан, т.е. в результате слияния останется самая последняя строка из самой последней вставки.
Пример:
-- without ver - the last inserted 'wins'
CREATE TABLE myFirstReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime
)
ENGINE = ReplacingMergeTree
ORDER BY key;
INSERT INTO myFirstReplacingMT Values (1, 'first', '2020-01-01 01:01:01');
INSERT INTO myFirstReplacingMT Values (1, 'second', '2020-01-01 00:00:00');
SELECT * FROM myFirstReplacingMT FINAL;
┌─key─┬─someCol─┬────────────eventTime─┐
│ 1 │ second │ 2020-01-01 00:00:00 │
└─────┴──────────┴───────────────────────┘
-- with ver - the row with the biggest ver 'wins'
CREATE TABLE mySecondReplacingMT
(
`key` Int64,
`someCol` String,
`eventTime` DateTime
)
ENGINE = ReplacingMergeTree(eventTime)
ORDER BY key;
INSERT INTO mySecondReplacingMT Values (1, 'first', '2020-01-01 01:01:01');
INSERT INTO mySecondReplacingMT Values (1, 'second', '2020-01-01 00:00:00');
SELECT * FROM mySecondReplacingMT FINAL;
┌─key─┬─someCol─┬────────────eventTime─┐
│ 1 │ first │ 2020-01-01 01:01:01 │
└─────┴──────────┴───────────────────────┘
При создании таблицы ReplacingMergeTree
используются те же секции, что и при создании таблицы MergeTree
.
Устаревший способ создания таблицы
Движок наследует функциональность MergeTree. Отличие заключается в том, что для таблиц SummingMergeTree
при слиянии кусков данных RT.WideStore все строки с одинаковым первичным ключом (точнее, с одинаковым ключом сортировки) заменяет на одну, которая хранит только суммы значений из столбцов с цифровым типом данных. Если ключ сортировки подобран таким образом, что одному значению ключа соответствует много строк, это значительно уменьшает объём хранения и ускоряет последующую выборку данных.
Мы рекомендуем использовать движок в паре с MergeTree
. В MergeTree
храните полные данные, а SummingMergeTree
используйте для хранения агрегированных данных, например, при подготовке отчетов. Такой подход позволит не утратить ценные данные из-за неправильно выбранного первичного ключа.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
)
ENGINE = SummingMergeTree([columns])
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
Описание параметров запроса смотрите в описании запроса.
Параметры SummingMergeTree
columns
— кортеж с именами столбцов, в которых будут суммироваться данные. Необязательный параметр. Столбцы должны иметь числовой тип и не должны входить в первичный ключ.
Если `columns` не задан, то RT.WideStore суммирует значения во всех столбцах с числовым типом данных, не входящих в первичный ключ.
Секции запроса
При создании таблицы SummingMergeTree
используются те же секции запроса, что и при создании таблицы MergeTree
.
Устаревший способ создания таблицы
Рассмотрим следующую таблицу:
CREATE TABLE summtt
(
key UInt32,
value UInt32
)
ENGINE = SummingMergeTree()
ORDER BY key
Добавим в неё данные:
INSERT INTO summtt Values(1,1),(1,2),(2,1)
RT.WideStore может не полностью просуммировать все строки (смотрите ниже по тексту), поэтому при запросе мы используем агрегатную функцию sum
и секцию GROUP BY
.
SELECT key, sum(value) FROM summtt GROUP BY key
┌─key─┬─sum(value)─┐
│ 2 │ 1 │
│ 1 │ 3 │
└─────┴─────────────┘
При вставке данных в таблицу они сохраняются как есть. Периодически RT.WideStore выполняет слияние вставленных кусков данных и именно в этот момент производится суммирование и замена многих строк с одинаковым первичным ключом на одну для каждого результирующего куска данных.
RT.WideStore может слить куски данных таким образом, что не все строки с одинаковым первичным ключом окажутся в одном финальном куске, т.е. суммирование будет не полным. Поэтому, при выборке данных (SELECT
) необходимо использовать агрегатную функцию sum() и секцию GROUP BY
как описано в примере выше.
Суммируются значения в столбцах с числовым типом данных. Набор столбцов определяется параметром columns
.
Если значения во всех столбцах для суммирования оказались нулевыми, то строчка удаляется.
Для столбцов, не входящих в первичный ключ и не суммирующихся, выбирается произвольное значение из имеющихся.
Значения для столбцов, входящих в первичный ключ, не суммируются.
Для столбцов типа AggregateFunction RT.WideStore выполняет агрегацию согласно заданной функции, повторяя поведение движка AggregatingMergeTree.
Таблица может иметь вложенные структуры данных, которые обрабатываются особым образом.
Если название вложенной таблицы заканчивается на Map
и она содержит не менее двух столбцов, удовлетворяющих критериям:
(*Int*, Date, DateTime)
или строковый (String, FixedString)
, назовем его условно key
,(*Int*, Float32/64)
, условно (values...)
,то вложенная таблица воспринимается как отображение key => (values...)
и при слиянии её строк выполняется слияние элементов двух множеств по key
со сложением соответствующих (values...)
.
Примеры:
[(1, 100)] + [(2, 150)] -> [(1, 100), (2, 150)]
[(1, 100)] + [(1, 150)] -> [(1, 250)]
[(1, 100)] + [(1, 150), (2, 150)] -> [(1, 250), (2, 150)]
[(1, 100), (2, 150)] + [(1, -100)] -> [(2, 150)]
При запросе данных используйте функцию sumMap(key, value) для агрегации Map
.
Для вложенной структуры данных не нужно указывать её столбцы в кортеже столбцов для суммирования.
Движок наследует функциональность MergeTree, изменяя логику слияния кусков данных. Все строки с одинаковым первичным ключом (точнее, с одинаковым ключом сортировки) RT.WideStore заменяет на одну (в пределах одного куска данных), которая хранит объединение состояний агрегатных функций.
Таблицы типа AggregatingMergeTree
могут использоваться для инкрементальной агрегации данных, в том числе, для агрегирующих материализованных представлений.
Движок обрабатывает все столбцы типа AggregateFunction.
Использование AggregatingMergeTree
оправдано только в том случае, когда это уменьшает количество строк на порядки.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = AggregatingMergeTree()
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
Описание параметров запроса смотрите в описании запроса.
Секции запроса
При создании таблицы AggregatingMergeTree
используются те же секции, что и при создании таблицы MergeTree
.
Устаревший способ создания таблицы
Для вставки данных используйте INSERT SELECT
с агрегатными -State
-функциями.
При выборке данных из таблицы AggregatingMergeTree
, используйте GROUP BY
и те же агрегатные функции, что и при вставке данных, но с суффиксом -Merge
.
В запросах SELECT
значения типа AggregateFunction
выводятся во всех форматах, которые поддерживает RT.WideStore, в виде implementation-specific бинарных данных. Если с помощью SELECT
выполнить дамп данных, например, в формат TabSeparated
, то потом этот дамп можно загрузить обратно с помощью запроса INSERT
.
Создаём материализованное представление типа AggregatingMergeTree
, следящее за таблицей test.visits
:
CREATE MATERIALIZED VIEW test.basic
ENGINE = AggregatingMergeTree() PARTITION BY toYYYYMM(StartDate) ORDER BY (CounterID, StartDate)
AS SELECT
CounterID,
StartDate,
sumState(Sign) AS Visits,
uniqState(UserID) AS Users
FROM test.visits
GROUP BY CounterID, StartDate;
Вставляем данные в таблицу test.visits
:
INSERT INTO test.visits ...
Данные окажутся и в таблице и в представлении test.basic
, которое выполнит агрегацию.
Чтобы получить агрегированные данные, выполним запрос вида SELECT ... GROUP BY ...
из представления test.basic
:
SELECT
StartDate,
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.basic
GROUP BY StartDate
ORDER BY StartDate;
Движок наследует функциональность от MergeTree и добавляет в алгоритм слияния кусков данных логику сворачивания (удаления) строк.
CollapsingMergeTree
асинхронно удаляет (сворачивает) пары строк, если все поля в ключе сортировки (ORDER BY
) эквивалентны, за исключением специального поля Sign
, которое может принимать значения 1
и -1
. Строки без пары сохраняются. Подробнее смотрите в разделе Сворачивание (удаление) строк.
Движок может значительно уменьшить объём хранения и, как следствие, повысить эффективность запросов SELECT
.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = CollapsingMergeTree(sign)
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr][SETTINGS name=value, ...]
Подробности про CREATE TABLE
смотрите в описании запроса.
Параметры CollapsingMergeTree
sign
— Имя столбца с типом строки: 1
— строка состояния, -1
— строка отмены состояния.
Тип данных столбца — `Int8`.
Секции запроса
При создании таблицы с движком CollapsingMergeTree
используются те же секции запроса что и при создании таблицы с движком MergeTree
.
Устаревший способ создания таблицы
Рассмотрим ситуацию, когда необходимо сохранять постоянно изменяющиеся данные для какого-либо объекта. Кажется логичным иметь одну строку для объекта и обновлять её при любом изменении, однако операция обновления является дорогостоящей и медленной для СУБД, поскольку требует перезаписи данных в хранилище. Если необходимо быстро записать данные, обновление не допустимо, но можно записать изменения объекта последовательно как описано ниже.
Используйте специальный столбец Sign
. Если Sign = 1
, то это означает, что строка является состоянием объекта, назовём её строкой состояния. Если Sign = -1
, то это означает отмену состояния объекта с теми же атрибутами, назовём её строкой отмены состояния.
Например, мы хотим рассчитать, сколько страниц проверили пользователи на каком-то сайте и как долго они там находились. В какой-то момент времени мы пишем следующую строку с состоянием действий пользователя:
┌───────────────UserID─┬─PageViews─┬─Duration─┬──Sign─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┘
Через некоторое время мы регистрируем изменение активности пользователя и записываем его следующими двумя строками.
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign──┐
│ 4324182021466249494 │ 5 │ 146 │ -1 │
│ 4324182021466249494 │ 6 │ 185 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┘
Первая строка отменяет предыдущее состояние объекта (пользователя). Она должна повторять все поля из ключа сортировки для отменённого состояния за исключением Sign
.
Вторая строка содержит текущее состояние.
Поскольку нам нужно только последнее состояние активности пользователя, строки
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │
│ 4324182021466249494 │ 5 │ 146 │ -1 │
└───────────────────────┴────────────┴───────────┴──────┘
можно удалить, сворачивая (удаляя) устаревшее состояние объекта. CollapsingMergeTree
выполняет это при слиянии кусков данных.
Зачем нужны две строки для каждого изменения описано в разделе Алгоритм.
Особенности подхода:
Sign
. Это увеличивает начальный размер хранилища, но позволяет быстро записывать данные.SELECT
сильно зависят от согласованности истории изменений объекта. Будьте точны при подготовке данных для вставки. Можно получить непредсказуемые результаты для несогласованных данных, например отрицательные значения для неотрицательных метрик, таких как глубина сеанса.Во время объединения кусков данных, каждая группа последовательных строк с одинаковым сортировочным ключом (ORDER BY
) уменьшается до не более чем двух строк, одна из которых имеет Sign = 1
(строка состояния), а другая строка с Sign = -1
(строка отмены состояния). Другими словами, записи сворачиваются.
Для каждого результирующего куска данных RT.WideStore сохраняет:
Также, если строк состояния как минимум на 2 больше, чем строк отмены состояния, или, наоборот, строк отмены состояния как минимум на 2 больше, чем строк состояния, то слияние продолжается, но RT.WideStore трактует подобные ситуации как логическую ошибку и записывает её в лог сервера. Подобная ошибка может возникнуть, если один и тот же блок данных вставлен несколько раз.
Как видно, от сворачивания не должны меняться результаты расчётов статистик. Изменения постепенно сворачиваются так, что остаются лишь последнее состояние почти каждого объекта.
Столбец Sign
необходим, поскольку алгоритм слияния не гарантирует, что все строки с одинаковым ключом сортировки будут находиться в одном результирующем куске данных и даже на одном физическом сервере. RT.WideStore выполняет запросы SELECT
несколькими потоками, и он не может предсказать порядок строк в результате. Если необходимо получить полностью свёрнутые данные из таблицы CollapsingMergeTree
, то необходимо агрегирование.
Для завершения свертывания добавьте в запрос секциюGROUP BY
и агрегатные функции, которые учитывают знак. Например, для расчета количества используйте sum(Sign)
вместоcount()
. Чтобы вычислить сумму чего-либо, используйте sum(Sign * x)
вместоsum(х)
, и так далее, а также добавьте HAVING sum(Sign) > 0
.
Таким образом можно вычислять агрегации count
, sum
и avg
. Если объект имеет хотя бы одно не свёрнутое состояние, то может быть вычислена агрегация uniq
. Агрегации min
и max
невозможно вычислить, поскольку CollapsingMergeTree
не сохраняет историю значений свернутых состояний.
Если необходимо выбирать данные без агрегации (например, проверить наличие строк, последние значения которых удовлетворяют некоторым условиям), можно использовать модификатор FINAL
для секции FROM
. Это вариант существенно менее эффективен.
Исходные данные:
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │
│ 4324182021466249494 │ 5 │ 146 │ -1 │
│ 4324182021466249494 │ 6 │ 185 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┘
Создание таблицы:
CREATE TABLE UAct
(
UserID UInt64,
PageViews UInt8,
Duration UInt8,
Sign Int8
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY UserID
Добавление данных:
INSERT INTO UAct VALUES (4324182021466249494, 5, 146, 1)
INSERT INTO UAct VALUES (4324182021466249494, 5, 146, -1),(4324182021466249494, 6, 185, 1)
Мы используем два запроса INSERT
для создания двух различных кусков данных. Если вставить данные одним запросом, RT.WideStore создаёт один кусок данных и никогда не будет выполнять слияние.
Получение данных:
SELECT * FROM UAct
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign─┐
│ 4324182021466249494 │ 5 │ 146 │ -1 │
│ 4324182021466249494 │ 6 │ 185 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┘
┌───────────────UserID─┬─PageViews─┬─Duration──┬─Sign─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┘
Что мы видим и где сворачивание?
Двумя запросами INSERT
, мы создали два куска данных. Запрос SELECT
был выполнен в 2 потока, и мы получили случайный порядок строк. Сворачивание не произошло, так как слияние кусков данных еще не произошло. RT.WideStore объединяет куски данных в неизвестный момент времени, который мы не можем предсказать.
Таким образом, нам нужна агрегация:
SELECT
UserID,
sum(PageViews * Sign) AS PageViews,
sum(Duration * Sign) AS Duration
FROM UActGROUP BY UserID
HAVING sum(Sign) > 0
┌───────────────UserID─┬─PageViews─┬─Duration─┐
│ 4324182021466249494 │ 6 │ 185 │
└───────────────────────┴────────────┴───────────┘
Если нам не нужна агрегация, но мы хотим принудительно выполнить свёртку данных, можно использовать модификатор FINAL
для секции FROM
.
SELECT * FROM UAct FINAL
┌───────────────UserID─┬─PageViews─┬─Duration─┬──Sign─┐
│ 4324182021466249494 │ 6 │ 185 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┘
Такой способ выбора данных очень неэффективен. Не используйте его для больших таблиц.
Исходные данные:
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │
│ 4324182021466249494 │ -5 │ -146 │ -1 │
│ 4324182021466249494 │ 6 │ 185 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┘
Идея состоит в том, что слияния при сворачивании учитывают только ключевые поля, поэтому в отменяющей строке можно указать отрицательные значения, которые нивелируют предыдущую версию записи при суммировании без учета поля Sign. Для этого подхода необходимо изменить тип данных PageViews
, Duration
для хранения отрицательных значений UInt8 -> Int16.
CREATE TABLE UAct
(
UserID UInt64,
PageViews Int16,
Duration Int16,
Sign Int8
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY UserID
Тестируем подход:
insert into UAct values(4324182021466249494, 5, 146, 1);
insert into UAct values(4324182021466249494, -5, -146, -1);
insert into UAct values(4324182021466249494, 6, 185, 1);
select * from UAct final; // старайтесь не использовать final (он подходит только для тестов и маленьких таблиц)
┌───────────────UserID─┬─PageViews─┬─Duration─┬──Sign─┐
│ 4324182021466249494 │ 6 │ 185 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┘
SELECT
UserID,
sum(PageViews) AS PageViews,
sum(Duration) AS Duration
FROM UAct
GROUP BY UserID
┌───────────────UserID─┬─PageViews─┬─Duration─┐
│ 4324182021466249494 │ 6 │ 185 │
└────────────────────────┴───────────┴───────────┘
select count() FROM UAct
┌─count()─┐
│ 3 │
└──────────┘
optimize table UAct final;
select * FROM UAct
┌───────────────UserID─┬─PageViews─┬─Duration──┬─Sign─┐
│ 4324182021466249494 │ 6 │ 185 │ 1 │
└───────────────────────┴────────────┴──────────┴───────┘
Движок:
Подробнее читайте в разделе Collapsing.
Движок наследует функциональность от MergeTree и добавляет в алгоритм слияния кусков данных логику сворачивания (удаления) строк. VersionedCollapsingMergeTree
предназначен для тех же задач, что и CollapsingMergeTree, но использует другой алгоритм свёртывания, который позволяет вставлять данные в любом порядке в несколько потоков. В частности, столбец Version
помогает свернуть строки правильно, даже если они вставлены в неправильном порядке. CollapsingMergeTree
требует строго последовательную вставку данных.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = VersionedCollapsingMergeTree(sign, version)
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr][SETTINGS name=value, ...]
Подробности про CREATE TABLE
смотрите в описании запроса.
Параметры движка:
VersionedCollapsingMergeTree(sign, version)
sign
— Имя столбца с типом строки: 1
— строка состояния, -1
— строка отмены состояния.
Тип данных столбца должен быть `Int8`.
version
— имя столбца с версией состояния объекта.
Тип данных столбца должен быть `UInt*`.
Секции запроса:
При создании таблицы VersionedСollapsingMergeTree
используются те же секции запроса, что и при создании таблицы MergeTree
.
Устаревший способ создания таблицы
Рассмотрим ситуацию, когда необходимо сохранять постоянно изменяющиеся данные для какого-либо объекта. Разумно иметь одну строку для объекта и обновлять эту строку при каждом изменении. Однако операция обновления является дорогостоящей и медленной для СУБД, поскольку требует перезаписи данных в хранилище. Обновление неприемлемо, если требуется быстро записывать данные, но можно записывать изменения в объект последовательно следующим образом.
Используйте столбец Sign
при записи строки. Если Sign = 1
, то это означает, что строка является состоянием объекта, назовём её строкой состояния. Если Sign = -1
, то это означает отмену состояния объекта с теми же атрибутами, назовём её строкой отмены состояния. Также используйте столбец Version
, который должен идентифицировать каждое состояние объекта отдельным номером.
Например, мы хотим рассчитать, сколько страниц пользователи посетили на каком-либо сайте и как долго они там находились. В какой-то момент времени мы записываем следующую строку состояния пользовательской активности:
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign──┬─Version─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │ 1 |
└───────────────────────┴────────────┴───────────┴──────┴──────────┘
Через некоторое время мы регистрируем изменение активности пользователя и записываем его следующими двумя строками.
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign──┬─Version─┐
│ 4324182021466249494 │ 5 │ 146 │ -1 │ 1 |
│ 4324182021466249494 │ 6 │ 185 │ 1 │ 2 |
└───────────────────────┴────────────┴───────────┴──────┴──────────┘
Первая строка отменяет предыдущее состояние объекта (пользователя). Она должна копировать все поля отменяемого состояния за исключением Sign
.
Вторая строка содержит текущее состояние.
Поскольку нам нужно только последнее состояние активности пользователя, строки
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign──┬─Version─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │ 1 |
│ 4324182021466249494 │ 5 │ 146 │ -1 │ 1 |
└───────────────────────┴────────────┴───────────┴──────┴──────────┘
можно удалить, сворачивая (удаляя) устаревшее состояние объекта. VersionedCollapsingMergeTree
делает это при слиянии кусков данных.
Чтобы узнать, зачем нам нужны две строки для каждого изменения, см. раздел Алгоритм.
Примечания по использованию:
Sign
. Это увеличивает начальный размер хранилища, но позволяет быстро записывать данные.SELECT
результаты сильно зависят от согласованности истории изменений объекта. Будьте точны при подготовке данных для вставки. Вы можете получить непредсказуемые результаты с несогласованными данными, такими как отрицательные значения для неотрицательных метрик, таких как глубина сеанса.Когда RT.WideStore объединяет куски данных, он удаляет каждую пару строк, которые имеют один и тот же первичный ключ и версию и разный Sign
. Порядок строк не имеет значения.
Когда RT.WideStore вставляет данные, он упорядочивает строки по первичному ключу. Если столбец Version
не находится в первичном ключе, RT.WideStore добавляет его к первичному ключу неявно как последнее поле и использует для сортировки.
RT.WideStore не гарантирует, что все строки с одинаковым первичным ключом будут находиться в одном результирующем куске данных или даже на одном физическом сервере. Это справедливо как для записи данных, так и для последующего слияния кусков данных. Кроме того, RT.WideStore обрабатывает запросы SELECT
несколькими потоками, и не может предсказать порядок строк в конечной выборке. Это означает, что если необходимо получить полностью «свернутые» данные из таблицы VersionedCollapsingMergeTree
, то требуется агрегирование.
Для завершения свертывания добавьте в запрос секцию GROUP BY
и агрегатные функции, которые учитывают знак. Например, для расчета количества используйте sum(Sign)
вместоcount()
. Чтобы вычислить сумму чего-либо, используйте sum(Sign * x)
вместоsum(х)
, а также добавьте HAVING sum(Sign) > 0
.
Таким образом можно вычислять агрегации count
, sum
и avg
. Агрегация uniq
может вычисляться, если объект имеет хотя бы одно не свернутое состояние. Невозможно вычислить агрегации min
и max
посколькуVersionedCollapsingMergeTree
не сохраняет историю значений для свернутых состояний.
Если необходимо выбирать данные без агрегации (например, проверить наличие строк, последние значения которых удовлетворяют некоторым условиям), можно использовать модификатор FINAL
для секции FROM
. Такой подход неэффективен и не должен использоваться с большими таблицами.
Данные для примера:
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign──┬─Version─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │ 1 |
│ 4324182021466249494 │ 5 │ 146 │ -1 │ 1 |
│ 4324182021466249494 │ 6 │ 185 │ 1 │ 2 |
└───────────────────────┴────────────┴───────────┴──────┴──────────┘
Создание таблицы:
CREATE TABLE UAct
(
UserID UInt64,
PageViews UInt8,
Duration UInt8,
Sign Int8,
Version UInt8
)
ENGINE = VersionedCollapsingMergeTree(Sign, Version)
ORDER BY UserID
Вставка данных:
INSERT INTO UAct VALUES (4324182021466249494, 5, 146, 1, 1)
INSERT INTO UAct VALUES (4324182021466249494, 5, 146, -1, 1),(4324182021466249494, 6, 185, 1, 2)
Мы используем два запроса INSERT
для создания двух различных кусков данных. Если мы вставляем данные с помощью одного запроса, RT.WideStore создаёт один кусок данных и не будет выполнять слияние.
Получение данных:
SELECT * FROM UAct
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign──┬─Version─┐
│ 4324182021466249494 │ 5 │ 146 │ 1 │ 1 │
└───────────────────────┴────────────┴───────────┴──────┴──────────┘
┌───────────────UserID─┬─PageViews─┬──Duration─┬─Sign─┬─Version─┐
│ 4324182021466249494 │ 5 │ 146 │ -1 │ 1 │
│ 4324182021466249494 │ 6 │ 185 │ 1 │ 2 │
└───────────────────────┴────────────┴───────────┴──────┴──────────┘
Что мы видим и где сворачивание? Мы создали два куска данных, используя два запроса INSERT
. Запрос SELECT
был выполнен в два потока, и результатом является случайный порядок строк. Свертывание не произошло, поскольку части данных еще не были объединены. RT.WideStore объединяет части данных в неизвестный момент времени, который мы не можем предсказать.
Поэтому нам нужна агрегация:
SELECT
UserID,
sum(PageViews * Sign) AS PageViews,
sum(Duration * Sign) AS Duration,
Version
FROM UAct
GROUP BY UserID, Version
HAVING sum(Sign) > 0
┌───────────────UserID─┬─PageViews─┬─Duration─┬──Version─┐
│ 4324182021466249494 │ 6 │ 185 │ 2 │
└───────────────────────┴────────────┴──────────┴───────────┘
Если нам не нужна агрегация, но мы хотим принудительно выполнить свёртку данных, то можно использовать модификатор FINAL
для секции FROM
.
SELECT * FROM UAct FINAL
┌───────────────UserID─┬─PageViews─┬─Duration─┬─Sign──┬─Version─┐
│ 4324182021466249494 │ 6 │ 185 │ 1 │ 2 │
└───────────────────────┴────────────┴───────────┴──────┴──────────┘
Это очень неэффективный способ выбора данных. Не используйте его для больших таблиц.
Движок предназначен для прореживания и агрегирования/усреднения (rollup) данных Graphite. Он может быть интересен разработчикам, которые хотят использовать RT.WideStore как хранилище данных для Graphite.
Если rollup не требуется, то для хранения данных Graphite можно использовать любой движок таблиц RT.WideStore, в противном случае используйте GraphiteMergeTree
. Движок уменьшает объём хранения и повышает эффективность запросов от Graphite.
Движок наследует свойства от MergeTree.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
Path String,
Time DateTime,
Value <Numeric_type>,
Version <Numeric_type>
...
) ENGINE = GraphiteMergeTree(config_section)
[PARTITION BY expr]
[ORDER BY expr]
[SAMPLE BY expr]
[SETTINGS name=value, ...]
Смотрите описание запроса CREATE TABLE.
В таблице должны быть столбцы для следующих данных:
Название метрики (сенсора Graphite). Тип данных: String
.
Время измерения метрики. Тип данных DateTime
.
Значение метрики. Тип данных: любой числовой.
Версия метрики. Тип данных: любой числовой (RT.WideStore сохраняет строки с последней версией или последнюю записанную строку, если версии совпадают. Другие строки удаляются при слиянии кусков данных).
Имена этих столбцов должны быть заданы в конфигурации rollup.
Параметры GraphiteMergeTree:
config_section
— имя раздела в конфигурационном файле, в котором находятся правила rollup.Секции запроса:
При создании таблицы GraphiteMergeTree
используются те же секции запроса, что и при создании таблицы MergeTree
.
Устаревший способ создания таблицы
Настройки прореживания данных задаются параметром graphite_rollup в конфигурации сервера . Имя параметра может быть любым. Можно создать несколько конфигураций и использовать их для разных таблиц.
Структура конфигурации rollup:
required-columns
patterns
path_column_name
— столбец, в котором хранится название метрики (сенсор Graphite). Значение по умолчанию: Path
.time_column_name
— столбец, в котором хранится время измерения метрики. Значение по умолчанию: Time
.value_column_name
— столбец со значением метрики в момент времени, установленный в time_column_name
. Значение по умолчанию: Value
.version_column_name
— столбец, в котором хранится версия метрики. Значение по умолчанию: Timestamp
.Структура раздела patterns
:
pattern
rule_type
regexp
function
pattern
rule_type
regexp
age + precision
...
pattern
rule_type
regexp
function
age + precision
...
pattern
...
default
function
age + precision
...
ВНИМАНИЕ:
Правила должны быть строго упорядочены:
- Правила без `function` или `retention`.
- Правила одновременно содержащие `function` и `retention`.
- Правило `default`.
При обработке строки RT.WideStore проверяет правила в разделе pattern
. Каждый pattern
(включая default
) может содержать параметр агрегации function
, параметр retention
, или оба параметра одновременно. Если имя метрики соответствует шаблону regexp
, то применяются правила pattern
, в противном случае правило default
.
Поля для разделов pattern
и default
:
rule_type
- тип правила (применяется только к метрикам указанных типов), используется для разделения правил проверки плоских/теггированных метрик. Опциональное поле. Значение по умолчанию: all
.- `all` (default) - универсальное правило, назначается также по умолчанию, если поле не задано
- `plain` - правило для плоских метрик (без тегов). Поле `regexp` обрабатывается как регулярное выражение.
- `tagged` - правило для теггированных метрик (метрика хранится в БД в формате `someName?tag1=value1&tag2=value2&tag3=value3`), регулярное выражение должно быть отсортированно по именам тегов, первым - значение тега `__name__`, если есть. Поле `regexp` обрабатывается как регулярное выражение.
- `tag_list` - правило для теггированных метрик, простой DSL для упрощения задания регулярного выражения в формате тегов graphite `someName;tag1=value1;tag2=value2`, `someName` или `tag1=value1;tag2=value2`. Поле `regexp` транслируется в правило `tagged`. Cортировать по именам тегов не обязательно, оно отсортируется автоматически. Значение тега (но не имя) может быть регулярным выражением (например `env=(dev|staging)`).
regexp
– шаблон имени метрики (регулярное выражение или DSL).age
– минимальный возраст данных в секундах.precision
– точность определения возраста данных в секундах. Должен быть делителем для 86400 (количество секунд в сутках).function
– имя агрегирующей функции, которую следует применить к данным, чей возраст оказался в интервале [age, age + precision]
. Допустимые функции: min/max/any/avg. Avg вычисляется неточно, как среднее от средних.<graphite_rollup>
<version_column_name>Version</version_column_name>
<pattern>
<regexp>click_cost</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<default>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
<graphite_rollup>
<version_column_name>Version</version_column_name>
<pattern>
<rule_type>plain</rule_type>
<regexp>click_cost</regexp>
<function>any</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp>^((.*)|.)min\?</regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tagged</rule_type>
<regexp><![CDATA[^someName\?(.*&)*tag1=value1(&|$)]]></regexp>
<function>min</function>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<pattern>
<rule_type>tag_list</rule_type>
<regexp>someName;tag2=value2</regexp>
<retention>
<age>0</age>
<precision>5</precision>
</retention>
<retention>
<age>86400</age>
<precision>60</precision>
</retention>
</pattern>
<default>
<function>max</function>
<retention>
<age>0</age>
<precision>60</precision>
</retention>
<retention>
<age>3600</age>
<precision>300</precision>
</retention>
<retention>
<age>86400</age>
<precision>3600</precision>
</retention>
</default>
</graphite_rollup>
Прореживание данных производится во время слияний. Обычно для старых партиций слияния не запускаются, поэтому для прореживания надо инициировать незапланированное слияние используя optimize. Или использовать дополнительные инструменты, например graphite-ch-optimizer.
Репликация поддерживается только для таблиц семейства MergeTree:
Репликация работает на уровне отдельных таблиц, а не всего сервера. То есть, на сервере могут быть расположены одновременно реплицируемые и не реплицируемые таблицы.
Репликация не зависит от шардирования. На каждом шарде репликация работает независимо.
Реплицируются сжатые данные запросов INSERT
, ALTER
(см. подробности в описании запроса ALTER).
Запросы CREATE
, DROP
, ATTACH
, DETACH
и RENAME
выполняются на одном сервере и не реплицируются:
CREATE TABLE
создаёт новую реплицируемую таблицу на том сервере, где его выполнили. Если таблица уже существует на других серверах, запрос добавляет новую реплику.DROP TABLE
удаляет реплику, расположенную на том сервере, где выполняется запрос.RENAME
переименовывает таблицу на одной реплик. Другими словами, реплицируемые таблицы на разных репликах могут называться по-разному.ClickHouse хранит метаинформацию о репликах в Apache ZooKeeper. Используйте ZooKeeper 3.4.5 или новее.
Для использовании репликации, установите параметры в секции zookeeper конфигурации сервера.
Внимание:
Не пренебрегайте настройками безопасности. ClickHouse поддерживает [ACL схему](https://zookeeper.apache.org/doc/current/zookeeperProgrammers.html#sc_ZooKeeperAccessControl) `digest` подсистемы безопасности ZooKeeper.
Пример указания адресов кластера ZooKeeper:
<zookeeper>
<node index="1">
<host>example1</host>
<port>2181</port>
</node>
<node index="2">
<host>example2</host>
<port>2181</port>
</node>
<node index="3">
<host>example3</host>
<port>2181</port>
</node>
</zookeeper>
Можно указать любой имеющийся у вас ZooKeeper-кластер - система будет использовать в нём одну директорию для своих данных (директория указывается при создании реплицируемой таблицы).
Если в конфигурационном файле не настроен ZooKeeper, то вы не сможете создать реплицируемые таблицы, а уже имеющиеся реплицируемые таблицы будут доступны в режиме только на чтение.
При запросах SELECT
, ZooKeeper не используется, т.е. репликация не влияет на производительность SELECT
и запросы работают так же быстро, как и для нереплицируемых таблиц. При запросах к распределенным реплицированным таблицам поведение ClickHouse регулируется настройками max_replica_delay_for_distributed_queries and fallback_to_stale_replicas_for_distributed_queries.
При каждом запросе INSERT
, делается около десятка записей в ZooKeeper в рамках нескольких транзакций. (Чтобы быть более точным, это для каждого вставленного блока данных; запрос INSERT содержит один блок или один блок на max_insert_block_size = 1048576
строк.) Это приводит к некоторому увеличению задержек при INSERT
, по сравнению с нереплицируемыми таблицами. Но если придерживаться обычных рекомендаций - вставлять данные пачками не более одного INSERT
в секунду, то это не составляет проблем. На всём кластере ClickHouse, использующим для координации один кластер ZooKeeper, может быть в совокупности несколько сотен INSERT
в секунду. Пропускная способность при вставке данных (количество строчек в секунду) такая же высокая, как для нереплицируемых таблиц.
Для очень больших кластеров, можно использовать разные кластеры ZooKeeper для разных шардов. Впрочем, на кластере Яндекс.Метрики (примерно 300 серверов) такой необходимости не возникает.
Репликация асинхронная, мульти-мастер. Запросы INSERT
и ALTER
можно направлять на любой доступный сервер. Данные вставятся на сервер, где выполнен запрос, а затем скопируются на остальные серверы. В связи с асинхронностью, только что вставленные данные появляются на остальных репликах с небольшой задержкой. Если часть реплик недоступна, данные на них запишутся тогда, когда они станут доступны. Если реплика доступна, то задержка составляет столько времени, сколько требуется для передачи блока сжатых данных по сети. Количество потоков для выполнения фоновых задач можно задать с помощью настройки background_schedule_pool_size.
Движок ReplicatedMergeTree
использует отдельный пул потоков для скачивания кусков данных. Размер пула ограничен настройкой background_fetches_pool_size, которую можно указать при перезапуске сервера.
По умолчанию, запрос INSERT ждёт подтверждения записи только от одной реплики. Если данные были успешно записаны только на одну реплику, и сервер с этой репликой перестал существовать, то записанные данные будут потеряны. Вы можете включить подтверждение записи от нескольких реплик, используя настройку insert_quorum
.
Каждый блок данных записывается атомарно. Запрос INSERT разбивается на блоки данных размером до max_insert_block_size = 1048576
строк. То есть, если в запросе INSERT
менее 1048576 строк, то он делается атомарно.
Блоки данных дедуплицируются. При многократной записи одного и того же блока данных (блоков данных одинакового размера, содержащих одни и те же строчки в одном и том же порядке), блок будет записан только один раз. Это сделано для того, чтобы в случае сбоя в сети, когда клиентское приложение не может понять, были ли данные записаны в БД, можно было просто повторить запрос INSERT
. При этом не имеет значения, на какую реплику будут отправлены INSERT-ы с одинаковыми данными. Запрос INSERT
идемпотентный. Параметры дедуплицирования регулируются настройками сервера merge_tree
При репликации, по сети передаются только исходные вставляемые данные. Дальнейшие преобразования данных (слияния) координируются и делаются на всех репликах одинаковым образом. За счёт этого минимизируется использование сети, и благодаря этому, репликация хорошо работает при расположении реплик в разных дата-центрах. (Стоит заметить, что дублирование данных в разных дата-центрах, по сути, является основной задачей репликации).
Количество реплик одних и тех же данных может быть произвольным. В Яндекс.Метрике в продакшене используется двукратная репликация. На каждом сервере используется RAID-5 или RAID-6, в некоторых случаях RAID-10. Это является сравнительно надёжным и удобным для эксплуатации решением.
Система следит за синхронностью данных на репликах и умеет восстанавливаться после сбоя. Восстановление после сбоя автоматическое (в случае небольших различий в данных) или полуавтоматическое (когда данные отличаются слишком сильно, что может свидетельствовать об ошибке конфигурации).
В начало имени движка таблицы добавляется Replicated
. Например, ReplicatedMergeTree
.
Параметры Replicated*MergeTree
zoo_path
— путь к таблице в ZooKeeper.replica_name
— имя реплики в ZooKeeper.other_parameters
— параметры движка, для которого создаётся реплицированная версия, например, версия для ReplacingMergeTree
.Пример:
CREATE TABLE table_name
(
EventDate DateTime,
CounterID UInt32,
UserID UInt32,
ver UInt16) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{layer}-{shard}/table_name', '{replica}', ver)
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID);
Пример в устаревшем синтаксисе
Как видно в примере, эти параметры могут содержать подстановки в фигурных скобках. Эти подстановки заменяются на соответствующие значения из конфигурационного файла, из секции macros.
Пример:
<macros>
<layer>05</layer>
<shard>02</shard>
<replica>example05-02-1.yandex.ru</replica>
</macros>
Путь к таблице в ZooKeeper должен быть разным для каждой реплицируемой таблицы. В том числе, для таблиц на разных шардах, должны быть разные пути. В данном случае, путь состоит из следующих частей:
/clickhouse/tables/
— общий префикс. Рекомендуется использовать именно его.
{layer}-{shard}
— идентификатор шарда. В данном примере он состоит из двух частей, так как на кластере Яндекс.Метрики используется двухуровневое шардирование. Для большинства задач, оставьте только подстановку {shard}, которая будет раскрываться в идентификатор шарда.
table_name
- имя узла для таблицы в ZooKeeper. Разумно делать его таким же, как имя таблицы. Оно указывается явно, так как, в отличие от имени таблицы, оно не меняется после запроса RENAME. Подсказка: можно также указать имя базы данных перед table_name
, например db_name.table_name
Можно использовать две встроенных подстановки {database}
и {table}
, они раскрываются в имя таблицы и в имя базы данных соответственно (если эти подстановки не переопределены в секции macros
). Т.о. Zookeeper путь можно задать как '/clickhouse/tables/{layer}-{shard}/{database}/{table}'
. Будьте осторожны с переименованиями таблицы при использовании этих автоматических подстановок. Путь в Zookeeper-е нельзя изменить, а подстановка при переименовании таблицы раскроется в другой путь, таблица будет обращаться к несуществующему в Zookeeper-е пути и перейдет в режим только для чтения.
Имя реплики — то, что идентифицирует разные реплики одной и той же таблицы. Можно использовать для него имя сервера, как показано в примере. Впрочем, достаточно, чтобы имя было уникально лишь в пределах каждого шарда.
Можно не использовать подстановки, а указать соответствующие параметры явно. Это может быть удобным для тестирования и при настройке маленьких кластеров. Однако в этом случае нельзя пользоваться распределенными DDL-запросами (ON CLUSTER
).
При работе с большими кластерами мы рекомендуем использовать подстановки, они уменьшают вероятность ошибки.
Можно указать аргументы по умолчанию для движка реплицируемых таблиц в файле конфигурации сервера.
<default_replica_path>/clickhouse/tables/{shard}/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
В этом случае можно опустить аргументы при создании таблиц:
CREATE TABLE table_name (
x UInt32
) ENGINE = ReplicatedMergeTree
ORDER BY x;
Это будет эквивалентно следующему запросу:
CREATE TABLE table_name (
x UInt32
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/table_name', '{replica}')
ORDER BY x;
Выполните запрос CREATE TABLE
на каждой реплике. Запрос создаёт новую реплицируемую таблицу, или добавляет новую реплику к имеющимся.
Если вы добавляете новую реплику после того, как таблица на других репликах уже содержит некоторые данные, то после выполнения запроса, данные на новую реплику будут скачаны с других реплик. То есть, новая реплика синхронизирует себя с остальными.
Для удаления реплики, выполните запрос DROP TABLE
. При этом, удаляется только одна реплика — расположенная на том сервере, где вы выполняете запрос.
Если при старте сервера, недоступен ZooKeeper, реплицируемые таблицы переходят в режим только для чтения. Система будет пытаться периодически установить соединение с ZooKeeper.
Если при INSERT
недоступен ZooKeeper, или происходит ошибка при взаимодействии с ним, будет выкинуто исключение.
При подключении к ZooKeeper, система проверяет соответствие между имеющимся в локальной файловой системе набором данных и ожидаемым набором данных (информация о котором хранится в ZooKeeper). Если имеются небольшие несоответствия, то система устраняет их, синхронизируя данные с реплик.
Обнаруженные битые куски данных (с файлами несоответствующего размера) или неизвестные куски (куски, записанные в файловую систему, но информация о которых не была записана в ZooKeeper) переносятся в поддиректорию detached (не удаляются). Недостающие куски скачиваются с реплик.
Стоит заметить, что ClickHouse не делает самостоятельно никаких деструктивных действий типа автоматического удаления большого количества данных.
При старте сервера (или создании новой сессии с ZooKeeper), проверяется только количество и размеры всех файлов. Если у файлов совпадают размеры, но изменены байты где-то посередине, то это обнаруживается не сразу, а только при попытке их прочитать при каком-либо запросе SELECT
. Запрос кинет исключение о несоответствующей чексумме или размере сжатого блока. В этом случае, куски данных добавляются в очередь на проверку, и при необходимости, скачиваются с реплик.
Если обнаруживается, что локальный набор данных слишком сильно отличается от ожидаемого, то срабатывает защитный механизм. Сервер сообщает об этом в лог и отказывается запускаться. Это сделано, так как такой случай может свидетельствовать об ошибке конфигурации - например, если реплика одного шарда была случайно сконфигурирована, как реплика другого шарда. Тем не менее, пороги защитного механизма поставлены довольно низкими, и такая ситуация может возникнуть и при обычном восстановлении после сбоя. В этом случае, восстановление делается полуавтоматически - «по кнопке».
Для запуска восстановления, создайте в ZooKeeper узел /path_to_table/replica_name/flags/force_restore_data
с любым содержимым или выполните команду для восстановления всех реплицируемых таблиц:
$ sudo -u clickhouse touch /var/lib/clickhouse/flags/force_restore_data
Затем запустите сервер. При старте, сервер удалит эти флаги и запустит восстановление.
Если на одном из серверов исчезли все данные и метаданные, восстановление делается следующим образом:
/var/lib/clickhouse/data/db_name/table_name/
) с реплики./var/lib/clickhouse/metadata/
. Если в определениях таблиц, идентификатор шарда или реплики, прописаны в явном виде - исправьте их, чтобы они соответствовали данной реплике. (Альтернативный вариант - запустить сервер и сделать самостоятельно все запросы ATTACH TABLE
, которые должны были бы быть в соответствующих .sql файлах в /var/lib/clickhouse/metadata/
.)/path_to_table/replica_name/flags/force_restore_data
с любым содержимым или выполните команду для восстановления всех реплицируемых таблиц: sudo -u clickhouse touch /var/lib/clickhouse/flags/force_restore_data
Затем запустите сервер (перезапустите, если уже запущен). Данные будут скачаны с реплик.
В качестве альтернативного варианта восстановления, вы можете удалить из ZooKeeper информацию о потерянной реплике (/path_to_table/replica_name
), и затем создать реплику заново, как написано в разделе Создание реплицированных таблиц .
Отсутствует ограничение на использование сетевой полосы при восстановлении. Имейте это ввиду, если восстанавливаете сразу много реплик.
Здесь и далее, под MergeTree
подразумеваются все движки таблиц семейства MergeTree
, так же для ReplicatedMergeTree
.
Если у вас была таблица типа MergeTree
, репликация которой делалась вручную, вы можете преобразовать её в реплицируемую таблицу. Это может понадобиться лишь в случаях, когда вы уже успели накопить большое количество данных в таблице типа MergeTree
, а сейчас хотите включить репликацию.
Если на разных репликах данные отличаются, то сначала синхронизируйте их, либо удалите эти данные на всех репликах кроме одной.
Переименуйте имеющуюся MergeTree таблицу, затем создайте со старым именем таблицу типа ReplicatedMergeTree
. Перенесите данные из старой таблицы в поддиректорию detached в директории с данными новой таблицы (/var/lib/clickhouse/data/db_name/table_name/
). Затем добавьте эти куски данных в рабочий набор с помощью выполнения запросов ALTER TABLE ATTACH PARTITION
на одной из реплик.
Создайте таблицу типа MergeTree с другим именем. Перенесите в её директорию с данными все данные из директории с данными таблицы типа ReplicatedMergeTree
. Затем удалите таблицу типа ReplicatedMergeTree
и перезапустите сервер.
Если вы хотите избавиться от таблицы ReplicatedMergeTree
, не запуская сервер, то
.sql
в директории с метаданными (/var/lib/clickhouse/metadata/
);/path_to_table/replica_name
);После этого, вы можете запустить сервер, создать таблицу типа MergeTree
, перенести данные в её директорию, и перезапустить сервер.
Если данные в ZooKeeper оказались утеряны или повреждены, то вы можете сохранить данные, переместив их в нереплицируемую таблицу, как описано в пункте выше.
Смотрите также:
Партиционирование данных доступно для таблиц семейства MergeTree (включая реплицированные таблицы). Таблицы MaterializedView, созданные на основе таблиц MergeTree, также поддерживают партиционирование.
Партиция – это набор записей в таблице, объединенных по какому-либо критерию. Например, партиция может быть по месяцу, по дню или по типу события. Данные для разных партиций хранятся отдельно. Это позволяет оптимизировать работу с данными, так как при обработке запросов будет использоваться только необходимое подмножество из всевозможных данных. Например, при получении данных за определенный месяц, ClickHouse будет считывать данные только за этот месяц.
Ключ партиционирования задается при создании таблицы, в секции PARTITION BY expr
. Ключ может представлять собой произвольное выражение из столбцов таблицы. Например, чтобы задать партиционирования по месяцам, можно использовать выражение toYYYYMM(date_column)
:
CREATE TABLE visits
(
VisitDate Date,
Hour UInt8,
ClientID UUID
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(VisitDate)
ORDER BY Hour
Ключом партиционирования также может быть кортеж из выражений (аналогично первичному ключу). Например:
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/name', 'replica1', Sign)
PARTITION BY (toMonday(StartDate), EventType)
ORDER BY (CounterID, StartDate, intHash32(UserID));
В этом примере задано партиционирование по типам событий, произошедших в течение текущей недели.
По умолчанию, ключ партиционирования с плавающей запятой не поддерживается. Чтобы использовать его, включите настройку allow_floating_point_partition_key.
Каждая партиция состоит из отдельных фрагментов или так называемых кусков данных. Каждый кусок отсортирован по первичному ключу. При вставке данных в таблицу каждая отдельная запись сохраняется в виде отдельного куска. Через некоторое время после вставки (обычно до 10 минут), ClickHouse выполняет в фоновом режиме слияние данных — в результате куски для одной и той же партиции будут объединены в более крупный кусок.
Примечание:
Не рекомендуется делать слишком гранулированное партиционирование – то есть задавать партиции по столбцу, в котором будет слишком большой разброс значений (речь идет о порядке более тысячи партиций). Это приведет к скоплению большого числа файлов и файловых дескрипторов в системе, что может значительно снизить производительность запросов `SELECT`.
Чтобы получить набор кусков и партиций таблицы, можно воспользоваться системной таблицей system.parts. В качестве примера рассмотрим таблицу visits
, в которой задано партиционирование по месяцам. Выполним SELECT
для таблицы system.parts
:
SELECT
partition,
name,
active
FROM system.parts
WHERE table = 'visits'
┌─partition─┬─name───────────────┬─active─┐
│ 201901 │ 201901_1_3_1 │ 0 │
│ 201901 │ 201901_1_9_2_11 │ 1 │
│ 201901 │ 201901_8_8_0 │ 0 │
│ 201901 │ 201901_9_9_0 │ 0 │
│ 201902 │ 201902_4_6_1_11 │ 1 │
│ 201902 │ 201902_10_10_0_11 │ 1 │
│ 201902 │ 201902_11_11_0_11 │ 1 │
└────────────┴─────────────────────┴────────┘
Столбец partition
содержит имена всех партиций таблицы. Таблица visits
из нашего примера содержит две партиции: 201901
и 201902
. Используйте значения из этого столбца в запросах ALTER … PARTITION.
Столбец name
содержит названия кусков партиций. Значения из этого столбца можно использовать в запросах ALTER ATTACH PART.
Столбец active
отображает состояние куска. 1
означает, что кусок активен; 0
– неактивен. К неактивным можно отнести куски, оставшиеся после слияния данных. Поврежденные куски также отображаются как неактивные. Неактивные куски удаляются приблизительно через 10 минут после того, как было выполнено слияние.
Рассмотрим детальнее имя куска 201901_1_9_2_11
:
201901
имя партиции;1
– минимальный номер блока данных;9
– максимальный номер блока данных;2
– уровень куска (глубина дерева слияний, которыми этот кусок образован).11
- версия мутации (если парт мутировал)Примечание:
Названия кусков для таблиц старого типа образуются следующим образом: `20190117_20190123_2_2_0` (минимальная дата _ максимальная дата _ номер минимального блока _ номер максимального блока _ уровень).
Как видно из примера выше, таблица содержит несколько отдельных кусков для одной и той же партиции (например, куски 201901_1_3_1
и 201901_1_9_2
принадлежат партиции 201901
). Это означает, что эти куски еще не были объединены – в файловой системе они хранятся отдельно. После того как будет выполнено автоматическое слияние данных (выполняется примерно спустя 10 минут после вставки данных), исходные куски будут объединены в один более крупный кусок и помечены как неактивные.
Вы можете запустить внеочередное слияние данных с помощью запроса OPTIMIZE. Пример:
OPTIMIZE TABLE visits PARTITION 201902;
┌─partition─┬─name──────────────┬─active─┐
│ 201901 │ 201901_1_3_1 │ 0 │
│ 201901 │ 201901_1_9_2_11 │ 1 │
│ 201901 │ 201901_8_8_0 │ 0 │
│ 201901 │ 201901_9_9_0 │ 0 │
│ 201902 │ 201902_4_6_1 │ 0 │
│ 201902 │ 201902_4_11_2_11 │ 1 │
│ 201902 │ 201902_10_10_0 │ 0 │
│ 201902 │ 201902_11_11_0 │ 0 │
└────────────┴────────────────────┴────────┘
Неактивные куски будут удалены примерно через 10 минут после слияния.
Другой способ посмотреть набор кусков и партиций – зайти в директорию с данными таблицы: /var/lib/clickhouse/data/<database>/<table>/
. Например:
/var/lib/clickhouse/data/default/visits$ ls -l
total 40
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 1 16:48 201901_1_3_1
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 5 16:17 201901_1_9_2_11
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 5 15:52 201901_8_8_0
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 5 15:52 201901_9_9_0
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 5 16:17 201902_10_10_0
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 5 16:17 201902_11_11_0
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 5 16:19 201902_4_11_2_11
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 5 12:09 201902_4_6_1
drwxr-xr-x 2 clickhouse clickhouse 4096 Feb 1 16:48 detached
‘201901_1_1_0’, ‘201901_1_7_1’ и т. д. – это директории кусков партиции. Каждый кусок содержит данные только для соответствующего месяца (таблица в данном примере содержит партиционирование по месяцам).
Директория detached
содержит куски, отсоединенные от таблицы с помощью запроса DETACH. Поврежденные куски также попадают в эту директорию – они не удаляются с сервера.
Сервер не использует куски из директории detached
. Вы можете в любое время добавлять, удалять, модифицировать данные в директории detached - сервер не будет об этом знать, пока вы не сделаете запрос ATTACH.
Следует иметь в виду, что при работающем сервере нельзя вручную изменять набор кусков на файловой системе, так как сервер не будет знать об этом. Для нереплицируемых таблиц, вы можете это делать при остановленном сервере, однако это не рекомендуется. Для реплицируемых таблиц, набор кусков нельзя менять в любом случае.
ClickHouse позволяет производить различные манипуляции с кусками: удалять, копировать из одной таблицы в другую или создавать их резервные копии. Подробнее см. в разделе Манипуляции с партициями и кусками.
Движки разработаны для сценариев, когда необходимо быстро записывать много таблиц с небольшим объёмом данных (менее 1 миллиона строк), а затем читать их целиком.
Движки семейства:
Табличные движки семейства Log
могут хранить данные в распределенных файловых системах HDFS или S3.
Общие свойства:
Движки:
Во время запросов `INSERT` таблица блокируется, а другие запросы на чтение и запись ожидают разблокировки таблицы. Если запросов на запись данных нет, то можно выполнять любое количество конкуретных запросов на чтение.
Это означает, что запросы `SELECT` не эффективны для выборки диапазонов данных.
Вы можете получить таблицу с повреждёнными данными, если что-то прервёт операцию записи (например, аварийное завершение работы сервера).
Отличия:
Движок TinyLog
самый простой в семье и обеспечивает самые низкие функциональность и эффективность. Движок TinyLog
не поддерживает параллельного чтения данных в несколько потоков. Движок читает данные медленнее, чем оба других движка с параллельным чтением, и использует почти столько же дескрипторов, сколько и движок Log
, поскольку хранит каждый столбец в отдельном файле. Его можно использовать в простых сценариях с низкой нагрузкой.
Движки Log
и StripeLog
поддерживают параллельное чтение. При чтении данных, RT.WideStore использует множество потоков. Каждый поток обрабатывает отдельный блок данных. Движок Log
сохраняет каждый столбец таблицы в отдельном файле. Движок StripeLog
хранит все данные в одном файле. Таким образом, движок StripeLog
использует меньше дескрипторов в операционной системе, а движок Log
обеспечивает более эффективное считывание данных.
Движок относится к семейству движков Log. Смотрите общие свойства и различия движков в статье Семейство Log.
Движок разработан для сценариев, когда необходимо записывать много таблиц с небольшим объёмом данных (менее 1 миллиона строк).
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
column1_name [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
column2_name [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = StripeLog
Смотрите подробное описание запроса CREATE TABLE.
Движок StripeLog
хранит все столбцы в одном файле. При каждом запросе INSERT
, RT.WideStore добавляет блок данных в конец файла таблицы, записывая столбцы один за другим.
Для каждой таблицы RT.WideStore записывает файлы:
data.bin
— файл с данными.index.mrk
— файл с метками. Метки содержат смещения для каждого столбца каждого вставленного блока данных.Движок StripeLog
не поддерживает запросы ALTER UPDATE
и ALTER DELETE
.
Файл с метками позволяет RT.WideStore распараллеливать чтение данных. Это означает, что запрос SELECT
возвращает строки в непредсказуемом порядке. Используйте секцию ORDER BY
для сортировки строк.
Создание таблицы:
CREATE TABLE stripe_log_table
(
timestamp DateTime,
message_type String,
message String
)
ENGINE = StripeLog
Вставка данных:
INSERT INTO stripe_log_table VALUES (now(),'REGULAR','The first regular message')
INSERT INTO stripe_log_table VALUES (now(),'REGULAR','The second regular message'),(now(),'WARNING','The first warning message')
Мы использовали два запроса INSERT
для создания двух блоков данных внутри файла data.bin
.
RT.WideStore использует несколько потоков при выборе данных. Каждый поток считывает отдельный блок данных и возвращает результирующие строки независимо по мере завершения. В результате порядок блоков строк в выходных данных в большинстве случаев не совпадает с порядком тех же блоков во входных данных. Например:
SELECT * FROM stripe_log_table
┌────────────timestamp─┬─message_type─┬─message──────────────────────┐
│ 2019-01-18 14:27:32 │ REGULAR │ The second regular message │
│ 2019-01-18 14:34:53 │ WARNING │ The first warning message │
└───────────────────────┴───────────────┴───────────────────────────────┘
┌────────────timestamp─┬─message_type─┬─message─────────────────────┐
│ 2019-01-18 14:23:43 │ REGULAR │ The first regular message │
└───────────────────────┴───────────────┴─────────────────────────────┘
Сортировка результатов (по умолчанию по возрастанию):
SELECT * FROM stripe_log_table ORDER BY timestamp
┌────────────timestamp─┬─message_type─┬─message──────────────────────┐
│ 2019-01-18 14:23:43 │ REGULAR │ The first regular message │
│ 2019-01-18 14:27:32 │ REGULAR │ The second regular message │
│ 2019-01-18 14:34:53 │ WARNING │ The first warning message │
└───────────────────────┴───────────────┴───────────────────────────────┘
Движок относится к семейству движков Log
. Смотрите общие свойства и различия движков в статье Семейство Log.
Отличается от TinyLog тем, что вместе с файлами столбцов лежит небольшой файл "засечек". Засечки пишутся на каждый блок данных и содержат смещение: с какого места нужно читать файл, чтобы пропустить заданное количество строк. Это позволяет читать данные из таблицы в несколько потоков. При конкурентном доступе к данным чтения могут выполняться одновременно, а записи блокируют чтения и друг друга. Движок Log
не поддерживает индексы. Также, если при записи в таблицу произошёл сбой, то таблица станет битой, и чтения из нее будут возвращать ошибку. Движок Log
подходит для временных данных, write-once таблиц, а также для тестовых и демонстрационных целей.
Движок относится к семейству движков Log. Смотрите общие свойства и различия движков в статье Семейство Log.
Типичный способ использования этой движка — это write-once: сначала данные один раз записываются, а затем читаются столько раз, сколько это необходимо. Например, можно использовать таблицы с движком TinyLog
для хранения промежуточных данных, которые обрабатываются небольшими блоками. Учтите, что хранить данные в большом количестве мелких таблиц неэффективно.
Запросы выполняются в один поток. То есть, этот движок предназначен для сравнительно маленьких таблиц (до 1 000 000 строк). Этот движок таблиц имеет смысл использовать в том случае, когда у вас есть много маленьких таблиц, так как он проще, чем движок Log (требуется открывать меньше файлов).
Для интеграции с внешними системами RT.WideStore предоставляет различные средства, включая движки таблиц. Конфигурирование интеграционных движков осуществляется с помощью запросов CREATE TABLE
или ALTER TABLE
, как и для других табличных движков. С точки зрения пользователя, настроенная интеграция выглядит как обычная таблица, но запросы к ней передаются через прокси во внешнюю систему. Этот прозрачный запрос является одним из ключевых преимуществ этого подхода по сравнению с альтернативными методами интеграции, такими как внешние словари или табличные функции, которые требуют использования пользовательских методов запроса при каждом использовании.
Список поддерживаемых интеграций:
Позволяет RT.WideStore подключаться к внешним базам данных с помощью ODBC.
Чтобы использование ODBC было безопасным, RT.WideStore использует отдельную программу clickhouse-odbc-bridge
. Если драйвер ODBC подгружать непосредственно из clickhouse-server
, то проблемы с драйвером могут привести к аварийной остановке сервера RT.WideStore. RT.WideStore автоматически запускает clickhouse-odbc-bridge
по мере необходимости. Программа устанавливается из того же пакета, что и clickhouse-server
.
Движок поддерживает тип данных Nullable.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1],
name2 [type2],
...
)
ENGINE = ODBC(connection_settings, external_database, external_table)
Смотрите подробное описание запроса CREATE TABLE.
Структура таблицы может отличаться от структуры исходной таблицы в удалённой СУБД:
Параметры движка:
connection_settings
— название секции с настройками соединения в файле odbc.ini
.external_database
— имя базы данных во внешней СУБД.external_table
— имя таблицы в external_database
.Извлечение данных из локальной установки MySQL через ODBC:
Этот пример проверялся в Ubuntu Linux 18.04 для MySQL server 5.7.
Убедитесь, что unixODBC и MySQL Connector установлены.
По умолчанию (если установлен из пакетов) RT.WideStore запускается от имени пользователя clickhouse
. Таким образом, вам нужно создать и настроить этого пользователя на сервере MySQL.
$ sudo mysql
mysql> CREATE USER 'clickhouse'@'localhost' IDENTIFIED BY 'clickhouse';
mysql> GRANT ALL PRIVILEGES ON *.* TO 'clickhouse'@'clickhouse' WITH GRANT OPTION;
Теперь настроим соединение в /etc/odbc.ini
.
$ cat /etc/odbc.ini
[mysqlconn]
DRIVER = /usr/local/lib/libmyodbc5w.so
SERVER = 127.0.0.1
PORT = 3306
DATABASE = test
USERNAME = clickhouse
PASSWORD = clickhouse
Вы можете проверить соединение с помощью утилиты isql
из установки unixODBC.
$ isql -v mysqlconn
+---------------------------------------+
| Connected! |
| |
...
Таблица в MySQL:
mysql> CREATE TABLE `test`.`test` (
-> `int_id` INT NOT NULL AUTO_INCREMENT,
-> `int_nullable` INT NULL DEFAULT NULL,
-> `float` FLOAT NOT NULL,
-> `float_nullable` FLOAT NULL DEFAULT NULL,
-> PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)
mysql> insert into test (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)mysql> select * from test;
+--------+--------------+-------+----------------+
| int_id | int_nullable | float | float_nullable |
+--------+--------------+-------+----------------+
| 1 | NULL | 2 | NULL |
+--------+--------------+-------+----------------+
1 row in set (0,00 sec)
Таблица в RT.WideStore, которая получает данные из таблицы MySQL:
CREATE TABLE odbc_t
(
`int_id` Int32,
`float_nullable` Nullable(Float32)
)
ENGINE = ODBC('DSN=mysqlconn', 'test', 'test')
SELECT * FROM odbc_t
┌─int_id─┬─float_nullable─┐
│ 1 │ ᴺᵁᴸᴸ │
└─────────┴─────────────────┘
Смотрите также:
Позволяет RT.WideStore подключаться к внешним базам данных с помощью JDBC.
Для реализации соединения по JDBC RT.WideStore использует отдельную программу clickhouse-jdbc-bridge, которая должна запускаться как демон.
Движок поддерживает тип данных Nullable.
CREATE TABLE [IF NOT EXISTS] [db.]table_name
ENGINE = JDBC(datasource_uri, external_database, external_table)
Параметры движка:
datasource_uri
— URI или имя внешней СУБД.
URI Формат: jdbc:<driver_name>://<host_name>:<port>/?user=<username>&password=<password>
.
Пример для MySQL: jdbc:mysql://localhost:3306/?user=root&password=root
.
external_database
— база данных во внешней СУБД.
external_table
— таблицы в external_database
или запросе выбора, например select * from table1, где column1 = 1
.
Создадим таблицу в на сервере MySQL с помощью консольного клиента MySQL:
mysql> CREATE TABLE `test`.`test` (
-> `int_id` INT NOT NULL AUTO_INCREMENT,
-> `int_nullable` INT NULL DEFAULT NULL,
-> `float` FLOAT NOT NULL,
-> `float_nullable` FLOAT NULL DEFAULT NULL,
-> PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)
mysql> insert into test (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)
mysql> select * from test;
+--------+--------------+-------+----------------+
| int_id | int_nullable | float | float_nullable |
+--------+--------------+-------+----------------+
| 1 | NULL | 2 | NULL |
+--------+--------------+-------+----------------+
1 row in set (0,00 sec)
Создадим таблицу на сервере RT.WideStore и получим из неё данные:
CREATE TABLE jdbc_table ENGINE JDBC('jdbc:mysql://localhost:3306/?user=root&password=root', 'test', 'test')
DESCRIBE TABLE jdbc_table
┌─name────────────────┬─type─────────────────┬─default_type─┬─default_expression─┐
│ int_id │ Int32 │ │ │
│ int_nullable │ Nullable(Int32) │ │ │
│ float │ Float32 │ │ │
│ float_nullable │ Nullable(Float32) │ │ │
└──────────────────────┴──────────────────────┴───────────────┴──────────────────────┘
SELECT *
FROM jdbc_table
┌─int_id─┬─int_nullable─┬─float─┬─float_nullable─┐
│ 1 │ ᴺᵁᴸᴸ │ 2 │ ᴺᵁᴸᴸ │
└─────────┴───────────────┴───────┴─────────────────┘
INSERT INTO jdbc_table(`int_id`, `float`)
SELECT toInt32(number), toFloat32(number * 1.0)
FROM system.numbers
Смотрите также:
Движок MySQL позволяет выполнять запросы SELECT
и INSERT
над данными, хранящимися на удалённом MySQL сервере.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause'])
SETTINGS
[ connection_pool_size=16, ]
[ connection_max_tries=3, ]
[ connection_wait_timeout=5, ]
[ connection_auto_close=true, ]
[ connect_timeout=10, ]
[ read_write_timeout=300 ]
;
Смотрите подробное описание запроса CREATE TABLE.
Структура таблицы может отличаться от структуры исходной таблицы MySQL:
Параметры движка:
host:port
— адрес сервера MySQL.
database
— имя базы данных на удалённом сервере.
table
— имя таблицы на удалённом сервере.
user
— пользователь MySQL.
password
— пароль пользователя.
replace_query
— флаг, отвечающий за преобразование запросов INSERT INTO
в REPLACE INTO
. Если replace_query=1
, то запрос заменяется.
on_duplicate_clause
— выражение ON DUPLICATE KEY on_duplicate_clause
, добавляемое к запросу INSERT
.
Пример: `INSERT INTO t (c1,c2) VALUES ('a', 2) ON DUPLICATE KEY UPDATE c2 = c2 + 1`, где `on_duplicate_clause` это `UPDATE c2 = c2 + 1`.
Чтобы узнать какие `on_duplicate_clause` можно использовать с секцией `ON DUPLICATE KEY` обратитесь к [документации MySQL](https://dev.mysql.com/doc/refman/8.0/en/insert-on-duplicate.html).Чтобы указать `on_duplicate_clause` необходимо передать `0` в параметр `replace_query`. Если одновременно передать `replace_query = 1` и `on_duplicate_clause`, то RT.WideStore сгенерирует исключение.
Простые условия WHERE
такие как =, !=, >, >=, <, =
выполняются на стороне сервера MySQL.
Остальные условия и ограничение выборки LIMIT
будут выполнены в RT.WideStore только после выполнения запроса к MySQL.
Поддерживает несколько реплик, которые должны быть перечислены через |
. Например:
CREATE TABLE test_replicas (id UInt32, name String, age UInt32, money UInt32) ENGINE = MySQL(`mysql{2|3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');
Таблица в MySQL:
mysql> CREATE TABLE `test`.`test` (
-> `int_id` INT NOT NULL AUTO_INCREMENT,
-> `int_nullable` INT NULL DEFAULT NULL,
-> `float` FLOAT NOT NULL,
-> `float_nullable` FLOAT NULL DEFAULT NULL,
-> PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)
mysql> insert into test (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)
mysql> select * from test;
+--------+--------------+-------+----------------+
| int_id | int_nullable | float | float_nullable |
+--------+--------------+-------+----------------+
| 1 | NULL | 2 | NULL |
+--------+--------------+-------+----------------+
1 row in set (0,00 sec)
Таблица в RT.WideStore, которая получает данные из созданной ранее таблицы MySQL:
CREATE TABLE mysql_table
(
`float_nullable` Nullable(Float32),
`int_id` Int32
)
ENGINE = MySQL('localhost:3306', 'test', 'test', 'bayonet', '123')
SELECT * FROM mysql_table
┌─float_nullable─┬─int_id─┐
│ ᴺᵁᴸᴸ │ 1 │
└─────────────────┴─────────┘
Настройки по умолчанию не очень эффективны, так как они не используют повторное соединение. Эти настройки позволяют увеличить количество запросов, выполняемых сервером в секунду.
Позволяет автоматически закрыть соединение после выполнения запроса, то есть отключить повторное использование соединения.
Возможные значения:
Значение по умолчанию: 1
.
Устанавливает количество повторных попыток для пула со сбоями соединения.
Возможные значения:
Значение по умолчанию: 3
.
Задает размер пула соединений (если используются все соединения, запрос будет ждать, пока какое-либо соединение не будет освобождено).
Возможные значения:
Значение по умолчанию: 16
.
Задает таймаут (в секундах) ожидания свободного подключения (в случае, если уже есть активные подключения connection_pool_size), 0 - не ждать.
Возможные значения:
Значение по умолчанию: 5
.
Задает таймаут ожидания подключения (в секундах).
Возможные значения:
Значение по умолчанию: 10
.
Задает таймаут ожидания ввода/вывода (в секундах).
Возможные значения:
Значение по умолчанию: 300
.
См. также:
Этот движок обеспечивает интеграцию с экосистемой Amazon S3. Он похож на движок HDFS, но обеспечивает специфические для S3 возможности.
CREATE TABLE s3_engine_table (name String, value UInt32)
ENGINE = S3(path, [aws_access_key_id, aws_secret_access_key,] format, [compression])
[SETTINGS ...]
Параметры движка:
path
— URL-адрес бакета с указанием пути к файлу. Поддерживает следующие подстановочные знаки в режиме "только чтение": *
, ?
, {abc,def}
и {N..M}
где N
, M
— числа, 'abc'
, 'def'
— строки. Подробнее смотри ниже.format
— формат файла.aws_access_key_id
, aws_secret_access_key
- данные пользователя учетной записи AWS. Вы можете использовать их для аутентификации ваших запросов. Необязательный параметр. Если параметры учетной записи не указаны, то используются данные из конфигурационного файла. Смотрите подробнее Использование сервиса S3 для хранения данных.compression
— тип сжатия. Возможные значения: none
, gzip/gz
, brotli/br
, xz/LZMA
, zstd/zst
. Необязательный параметр. Если не указано, то тип сжатия определяется автоматически по расширению файла.Пример:
CREATE TABLE s3_engine_table (name String, value UInt32)
ENGINE=S3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'gzip')
SETTINGS input_format_with_names_use_header = 0;
INSERT INTO s3_engine_table VALUES ('one', 1), ('two', 2), ('three', 3);
SELECT * FROM s3_engine_table LIMIT 2;
┌─name─┬─value─┐
│ one │ 1 │
│ two │ 2 │
└──────┴────────┘
_path
— путь к файлу._file
— имя файла.Подробнее про виртуальные столбцы можно прочитать здесь.
ALTER
и SELECT...SAMPLE
,Аргумент path
может указывать на несколько файлов, используя символы подстановки. Для обработки файл должен существовать и соответствовать всему шаблону пути. Список файлов определяется во время выполнения запроса SELECT
(не в момент выполнения запроса CREATE
).
*
— заменяет любое количество любых символов, кроме /
, включая пустую строку.?
— заменяет любые одиночные символы.{some_string, another_string, yet_another_one}
— заменяет любые строки 'some_string', 'another_string', 'yet_another_one'
.{N..M}
— заменяет любое число от N до M, включая обе границы. N и M могут иметь ведущие нули, например 000..078
.Конструкции с {}
аналогичны функции remote.
ПРИМЕЧАНИЕ:
Если список файлов содержит диапазоны чисел с ведущими нулями, используйте конструкцию с фигурными скобками для каждой цифры отдельно или используйте `?`.
Пример подстановки 1
Таблица содержит данные из файлов с именами file-000.csv
, file-001.csv
, … , file-999.csv
:
CREATE TABLE big_table (name String, value UInt32)
ENGINE = S3('https://storage.yandexcloud.net/my-bucket/my_folder/file-{000..999}.csv', 'CSV');
Пример подстановки 2
Предположим, есть несколько файлов в формате CSV со следующими URL-адресами в S3:
Существует несколько способов создать таблицу, включающую в себя все шесть файлов:
1. Задайте диапазон для суффиксов в названии файла:
CREATE TABLE table_with_range (name String, value UInt32)
ENGINE = S3('https://storage.yandexcloud.net/my-bucket/{some,another}_folder/some_file_{1..3}', 'CSV');
2. Таблица содержит все файлы с префиксом some_file_
(в каталогах не должно быть других файлов с таким префиксом):
CREATE TABLE table_with_question_mark (name String, value UInt32)
ENGINE = S3('https://storage.yandexcloud.net/my-bucket/{some,another}_folder/some_file_?', 'CSV');
3. Таблица содержит все файлы в обоих каталогах (в каталогах не должно быть других файлов, соответствующих формату и схеме, описанным в запросе):
CREATE TABLE table_with_asterisk (name String, value UInt32)
ENGINE = S3('https://storage.yandexcloud.net/my-bucket/{some,another}_folder/*', 'CSV');
Перед выполнением запроса или в конфигурационном файле могут быть установлены следующие настройки:
s3_max_single_part_upload_size
— максимальный размер объекта для загрузки с использованием однокомпонентной загрузки в S3. Значение по умолчанию — 64 Mб
.s3_min_upload_part_size
— минимальный размер объекта для загрузки при многокомпонентной загрузке в S3 Multipart upload. Значение по умолчанию — 512 Mб
.s3_max_redirects
— максимальное количество разрешенных переадресаций S3. Значение по умолчанию — 10
.s3_single_read_retries
— максимальное количество попыток запроса при единичном чтении. Значение по умолчанию — 4
.Соображение безопасности: если злонамеренный пользователь попробует указать произвольные URL-адреса S3, параметр s3_max_redirects
должен быть установлен в ноль, чтобы избежать атак SSRF. Как альтернатива, в конфигурации сервера должен быть указан remote_host_filter
.
Для точки приема запроса (которая соответствует точному префиксу URL-адреса) в конфигурационном файле могут быть заданы следующие настройки:
Обязательная настройка:
endpoint
— указывает префикс точки приема запроса.Необязательные настройки:
access_key_id
и secret_access_key
— указывают учетные данные для использования с данной точкой приема запроса.use_environment_credentials
— если true
, S3-клиент будет пытаться получить учетные данные из переменных среды и метаданных Amazon EC2 для данной точки приема запроса. Значение по умолчанию — false
.use_insecure_imds_request
— признак использования менее безопасного соединения при выполнении запроса к IMDS при получении учётных данных из метаданных Amazon EC2. Значение по умолчанию — false
.region
— название региона S3.header
— добавляет указанный HTTP-заголовок к запросу на заданную точку приема запроса. Может быть определен несколько раз.server_side_encryption_customer_key_base64
— устанавливает необходимые заголовки для доступа к объектам S3 с шифрованием SSE-C.single_read_retries
— Максимальное количество попыток запроса при единичном чтении. Значение по умолчанию — 4
.Пример
<s3>
<endpoint-name>
<endpoint>https://storage.yandexcloud.net/my-test-bucket-768/</endpoint>
<!-- <access_key_id>ACCESS_KEY_ID</access_key_id> -->
<!-- <secret_access_key>SECRET_ACCESS_KEY</secret_access_key> -->
<!-- <region>us-west-1</region> -->
<!-- <use_environment_credentials>false</use_environment_credentials> -->
<!-- <use_insecure_imds_request>false</use_insecure_imds_request> -->
<!-- <header>Authorization: Bearer SOME-TOKEN</header> -->
<!-- <server_side_encryption_customer_key_base64>BASE64-ENCODED-KEY</server_side_encryption_customer_key_base64> -->
<!-- <single_read_retries>4</single_read_retries> -->
</endpoint-name>
</s3>
Смотрите также:
Движок таблиц MongoDB позволяет читать данные из коллекций СУБД MongoDB. В таблицах допустимы только плоские (не вложенные) типы данных. Запись (INSERT
-запросы) не поддерживается.
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2],
...
) ENGINE = MongoDB(host:port, database, collection, user, password [, options]);
Параметры движка:
host:port
— адрес сервера MongoDB.
database
— имя базы данных на удалённом сервере.
collection
— имя коллекции на удалённом сервере.
user
— пользователь MongoDB.
password
— пароль пользователя.
options
— MongoDB connection string options (optional parameter).
Создание таблицы в RT.WideStore для чтения данных из коллекции MongoDB:
CREATE TABLE mongo_table
(
key UInt64,
data String
) ENGINE = MongoDB('mongo1:27017', 'test', 'simple_table', 'testuser', 'clickhouse');
Чтение с сервера MongoDB, защищенного SSL:
CREATE TABLE mongo_table_ssl
(
key UInt64,
data String
) ENGINE = MongoDB('mongo2:27017', 'test', 'simple_table', 'testuser', 'clickhouse', 'ssl=true');
Запрос к таблице:
SELECT COUNT() FROM mongo_table;
┌─count()─┐
│ 4 │
└──────────┘
Этот движок обеспечивает интеграцию с экосистемой Apache Hadoop, позволяя управлять данными в HDFS посредством RT.WideStore. Данный движок похож на движки File и URL, но предоставляет возможности, характерные для Hadoop.
ENGINE = HDFS(URI, format)
Параметры движка:
В параметр URI
нужно передавать полный URI файла в HDFS. Часть URI с путем файла может содержать шаблоны. В этом случае таблица может использоваться только для чтения. Параметр format
должен быть таким, который RT.WideStore может использовать и в запросах INSERT
, и в запросах SELECT
. Полный список поддерживаемых форматов смотрите в разделе Форматы.
Пример:
1. Создадим на сервере таблицу hdfs_engine_table
:
CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')
2. Заполним файл:
INSERT INTO hdfs_engine_table VALUES ('one', 1), ('two', 2), ('three', 3)
3. Запросим данные:
SELECT * FROM hdfs_engine_table LIMIT 2
┌─name─┬─value─┐
│ one │ 1 │
│ two │ 2 │
└──────┴────────┘
ALTER
и SELECT...SAMPLE
;Шаблоны в пути:
Шаблоны могут содержаться в нескольких компонентах пути. Обрабатываются только существующие файлы, название которых целиком удовлетворяет шаблону (не только суффиксом или префиксом).
*
— Заменяет любое количество любых символов кроме /
, включая отсутствие символов.?
— Заменяет ровно один любой символ.{some_string,another_string,yet_another_one}
— Заменяет любую из строк 'some_string', 'another_string', 'yet_another_one'
.{N..M}
— Заменяет любое число в интервале от N
до M
включительно (может содержать ведущие нули).Конструкция с {}
аналогична табличной функции remote.
Пример:
Предположим, у нас есть несколько файлов со следующими URI в HDFS:
Есть несколько возможностей создать таблицу, состоящую из этих шести файлов:
CREATE TABLE table_with_range (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/some_file_{1..3}', 'TSV')
Другой способ:
CREATE TABLE table_with_question_mark (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/some_file_?', 'TSV')
Таблица, состоящая из всех файлов в обеих директориях (все файлы должны удовлетворять формату и схеме, указанной в запросе):
CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/{some,another}_dir/*', 'TSV')
Предупреждение:
Если список файлов содержит числовые интервалы с ведущими нулями, используйте конструкцию с фигурными скобочками для каждой цифры или используйте `?`.
Пример:
Создадим таблицу с именами file000
, file001
, … , file999
:
CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
Похоже на GraphiteMergeTree, движок HDFS поддерживает расширенную конфигурацию с использованием файла конфигурации RT.WideStore. Есть два раздела конфигурации которые вы можете использовать: глобальный (hdfs
) и на уровне пользователя (hdfs_*
). Глобальные настройки применяются первыми, и затем применяется конфигурация уровня пользователя (если она указана).
<!-- Глобальные настройки для движка HDFS -->
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>clickuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<!-- Конфигурация специфичная для пользователя "root" -->
<hdfs_root>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_root>
Параметры конфигурации
Поддерживаемые из libhdfs3
параметр |
по умолчанию |
---|---|
rpc_client_connect_tcpnodelay | true |
dfs_client_read_shortcircuit | true |
output_replace-datanode-on-failure | true |
input_notretry-another-node | false |
input_localread_mappedfile | true |
dfs_client_use_legacy_blockreader_local | false |
rpc_client_ping_interval | 10 * 1000 |
rpc_client_connect_timeout | 600 * 1000 |
rpc_client_read_timeout | 3600 * 1000 |
rpc_client_write_timeout | 3600 * 1000 |
rpc_client_socekt_linger_timeout | -1 |
rpc_client_connect_retry | 10 |
rpc_client_timeout | 3600 * 1000 |
dfs_default_replica | 3 |
input_connect_timeout | 600 * 1000 |
input_read_timeout | 3600 * 1000 |
input_write_timeout | 3600 * 1000 |
input_localread_default_buffersize | 1 1024 1024 |
dfs_prefetchsize | 10 |
input_read_getblockinfo_retry | 3 |
input_localread_blockinfo_cachesize | 1000 |
input_read_max_retry | 60 |
output_default_chunksize | 512 |
output_default_packetsize | 64 * 1024 |
output_default_write_retry | 10 |
output_connect_timeout | 600 * 1000 |
output_read_timeout | 3600 * 1000 |
output_write_timeout | 3600 * 1000 |
output_close_timeout | 3600 * 1000 |
output_packetpool_size | 1024 |
output_heeartbeat_interval | 10 * 1000 |
dfs_client_failover_max_attempts | 15 |
dfs_client_read_shortcircuit_streams_cache_size | 256 |
dfs_client_socketcache_expiryMsec | 3000 |
dfs_client_socketcache_capacity | 16 |
dfs_default_blocksize | 64 1024 1024 |
dfs_default_uri | "hdfs://localhost:9000" |
hadoop_security_authentication | "simple" |
hadoop_security_kerberos_ticket_cache_path | "" |
dfs_client_log_severity | "INFO" |
dfs_domain_socket_path | "" |
Руководство по конфигурации HDFS поможет объяснить назначения некоторых параметров.
Расширенные параметры для RT.WideStore
параметр |
по умолчанию |
---|---|
hadoop_kerberos_keytab | "" |
hadoop_kerberos_principal | "" |
Ограничения:
hadoop_security_kerberos_ticket_cache_path
и libhdfs3_conf
могут быть определены только на глобальном, а не на пользовательском уровнеЕсли параметр hadoop_security_authentication
имеет значение kerberos
, RT.WideStore аутентифицируется с помощью Kerberos. Расширенные параметры и hadoop_security_kerberos_ticket_cache_path
помогают сделать это. Обратите внимание что из-за ограничений libhdfs3 поддерживается только устаревший метод аутентификации, коммуникация с узлами данных не защищена SASL (HADOOP_SECURE_DN_USER
надежный показатель такого подхода к безопасности). Используйте tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh
для примера настроек.
Если hadoop_kerberos_keytab
, hadoop_kerberos_principal
или hadoop_security_kerberos_ticket_cache_path
указаны в настройках, будет использоваться аутентификация с помощью Kerberos. hadoop_kerberos_keytab
и hadoop_kerberos_principal
обязательны в этом случае.
_path
— Путь к файлу._file
— Имя файла.См. также:
Движок позволяет импортировать и экспортировать данные из SQLite, а также поддерживает отправку запросов к таблицам SQLite напрямую из RT.WideStore.
CREATE TABLE [IF NOT EXISTS] [db.]table_name
(
name1 [type1],
name2 [type2], ...
) ENGINE = SQLite('db_path', 'table')
Параметры движка:
db_path
— путь к файлу с базой данных SQLite.table
— имя таблицы в базе данных SQLite.Отобразим запрос, с помощью которого была создана таблица SQLite:
SHOW CREATE TABLE sqlite_db.table2;
CREATE TABLE SQLite.table2
(
`col1` Nullable(Int32),
`col2` Nullable(String)
)
ENGINE = SQLite('sqlite.db','table2');
Получим данные из таблицы:
SELECT * FROM sqlite_db.table2 ORDER BY col1;
┌─col1─┬─col2──┐
│ 1 │ text1 │
│ 2 │ text2 │
│ 3 │ text3 │
└──────┴────────┘
См. также:
Движок работает с Apache Kafka.
Kafka позволяет:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port',
kafka_topic_list = 'topic1,topic2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',]
[kafka_num_consumers = N,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N]
[kafka_commit_every_batch = 0,]
[kafka_client_id = '',]
[kafka_poll_timeout_ms = 0,]
[kafka_poll_max_batch_size = 0,]
[kafka_flush_interval_ms = 0,]
[kafka_thread_per_consumer = 0,]
[kafka_handle_error_mode = 'default',]
[kafka_commit_on_select = false,]
[kafka_max_rows_per_message = 1];
Обязательные параметры:
kafka_broker_list
— перечень брокеров, разделенный запятыми (localhost:9092
).kafka_topic_list
— перечень необходимых топиков Kafka.kafka_group_name
— группа потребителя Kafka. Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.kafka_format
— формат сообщений. Названия форматов должны быть теми же, что можно использовать в секции FORMAT
, например, JSONEachRow
. Подробнее читайте в разделе Форматы.Опциональные параметры:
kafka_row_delimiter
— символ-разделитель записей (строк), которым завершается сообщение.kafka_schema
— опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Cap’n Proto требует путь к файлу со схемой и название корневого объекта schema.capnp:Message
.kafka_num_consumers
— количество потребителей (consumer) на таблицу. По умолчанию: 1
. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.kafka_max_block_size
— максимальный размер пачек (в сообщениях) для poll (по умолчанию max_block_size
).kafka_skip_broken_messages
— максимальное количество некорректных сообщений в блоке. Если kafka_skip_broken_messages = N
, то движок отбрасывает N
сообщений Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.kafka_commit_every_batch
— включает или отключает режим записи каждой принятой и обработанной пачки по отдельности вместо единой записи целого блока (по умолчанию 0
).kafka_client_id
— идентификатор клиента. Значение по умолчанию пусто – ''.kafka_poll_timeout_ms
- Таймаут для poll. По умолчанию: (../../../operations/settings/settings.md#stream_poll_timeout_ms)kafka_poll_max_batch_size
- Максимальное количество сообщений в одном poll Kafka. По умолчанию: (../../../operations/settings/settings.md#setting-max_block_size)kafka_flush_interval_ms
- Таймаут для сброса данных из Kafka. По умолчанию: (../../../operations/settings/settings.md#stream-flush-interval-ms)kafka_thread_per_consumer
— включает или отключает предоставление отдельного потока каждому потребителю (по умолчанию 0
). При включенном режиме каждый потребитель сбрасывает данные независимо и параллельно, при отключённом — строки с данными от нескольких потребителей собираются в один блок.kafka_handle_error_mode
- Способ обработки ошибок для Kafka. Возможные значения: default, stream.kafka_commit_on_select
- Сообщение о commit при запросе select. По умолчанию: false
.kafka_max_rows_per_message
- Максимальное количество строк записанных в одно сообщение Kafka для формата row-based. По умолчанию: 1
.Примеры:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
Устаревший способ создания таблицы.
Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.
Группы пластичны и синхронизированы на кластере. Например, если есть 10 топиков и 5 копий таблицы в кластере, то в каждую копию попадет по 2 топика. Если количество копий изменится, то распределение топиков по копиям изменится автоматически. Подробно читайте об этом на http://kafka.apache.org/intro.
Чтение сообщения с помощью SELECT
не слишком полезно (разве что для отладки), поскольку каждое сообщения может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных представлений. Для этого:
Когда к движку присоединяется материализованное представление (MATERIALIZED VIEW
), оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от Kafka и преобразовывать их в необходимый формат с помощью SELECT
. Материализованных представлений у одной kafka таблицы может быть сколько угодно, они не считывают данные из таблицы kafka непосредственно, а получают новые записи (блоками), таким образом можно писать в несколько таблиц с разным уровнем детализации (с группировкой - агрегацией и без).
Пример:
CREATE TABLE queue (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE TABLE daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level; SELECT level, sum(total) FROM daily GROUP BY level;
Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
Чтобы остановить получение данных топика или изменить логику преобразования, отсоедините материализованное представление:
DETACH TABLE consumer;
ATTACH TABLE consumer;
Если необходимо изменить целевую таблицу с помощью ALTER
, то материализованное представление рекомендуется отключить, чтобы избежать несостыковки между целевой таблицей и данными от представления.
Аналогично GraphiteMergeTree, движок Kafka поддерживает расширенную конфигурацию с помощью конфигурационного файла RT.WideStore. Существует два конфигурационных ключа, которые можно использовать: глобальный (kafka
) и по топикам (kafka_topic_*
). Сначала применяется глобальная конфигурация, затем конфигурация по топикам (если она существует).
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<!-- Configuration specific for topic "logs" -->
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
В документе librdkafka configuration reference можно увидеть список возможных опций конфигурации. Используйте подчеркивание (_
) вместо точки в конфигурации RT.WideStore. Например, check.crcs=true
будет соответствовать <check_crcs>true</check_crcs>
.
Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент security_protocol
со значением sasl_plaintext
. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС. RT.WideStore может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы sasl_kerberos_service_name
, sasl_kerberos_keytab
и sasl_kerberos_principal
.
Пример:
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>
_topic
— топик Kafka._key
— ключ сообщения._offset
— оффсет сообщения._timestamp
— временная метка сообщения._timestamp_ms
— временная метка сообщения в миллисекундах._partition
— секция топика Kafka._headers.name
- Массив ключей заголовков сообщений._headers.value
- Массив значений заголовков сообщений.Смотрите также:
Этот движок позволяет интегрировать RT.WideStore с rocksdb.
2.3.9.1 Создание таблицы
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = EmbeddedRocksDB
PRIMARY KEY(primary_key_name);
Обязательные параметры:
primary_key_name
может быть любое имя столбца из списка столбцов.primary key
является обязательным. Он будет сериализован в двоичном формате как ключ rocksdb
.rockdb
в соответствующем порядке.equals
или in
оптимизируются для поиска по нескольким ключам из rocksdb
.Пример:
CREATE TABLE test
(
`key` String,
`v1` UInt32,
`v2` String,
`v3` Float32,
)
ENGINE = EmbeddedRocksDB
PRIMARY KEY key;
Движок PostgreSQL позволяет выполнять запросы SELECT
и INSERT
для таблиц на удаленном сервере PostgreSQL.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = PostgreSQL('host:port', 'database', 'table', 'user', 'password'[, `schema`]);
Смотрите подробное описание запроса CREATE TABLE.
Структура таблицы может отличаться от структуры исходной таблицы PostgreSQL:
Параметры движка:
host:port
— адрес сервера PostgreSQL.database
— имя базы данных на сервере PostgreSQL.table
— имя таблицы.user
— имя пользователя PostgreSQL.password
— пароль пользователя PostgreSQL.schema
— имя схемы, если не используется схема по умолчанию. Необязательный аргумент.Запросы SELECT
на стороне PostgreSQL выполняются как COPY (SELECT ...) TO STDOUT
внутри транзакции PostgreSQL только на чтение с коммитом после каждого запроса SELECT
.
Простые условия для WHERE
, такие как =
, !=
, >
, >=
, <
, <=
и IN
, исполняются на стороне PostgreSQL сервера.
Все операции объединения, аггрегации, сортировки, условия IN [ array ]
и ограничения LIMIT
выполняются на стороне RT.WideStore только после того, как запрос к PostgreSQL закончился.
Запросы INSERT
на стороне PostgreSQL выполняются как COPY "table_name" (field1, field2, ... fieldN) FROM STDIN
внутри PostgreSQL транзакции с автоматическим коммитом после каждого запроса INSERT
.
PostgreSQL массивы конвертируются в массивы RT.WideStore.
ВНИМАНИЕ:
Будьте внимательны, в PostgreSQL массивы, созданные как `type_name[]`,
являются многомерными и могут содержать в себе разное количество измерений в разных строках одной таблицы.
Внутри RT.WideStore допустимы только многомерные массивы с одинаковым кол-вом измерений во всех строках таблицы.
Поддерживает несколько реплик, которые должны быть перечислены через |
. Например:
CREATE TABLE test_replicas (id UInt32, name String) ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword');
При использовании словаря PostgreSQL поддерживается приоритет реплик. Чем больше номер реплики, тем ниже ее приоритет. Наивысший приоритет у реплики с номером 0
.
В примере ниже реплика example01-1
имеет более высокий приоритет:
<postgresql>
<port>5432</port>
<user>clickhouse</user>
<password>qwerty</password>
<replica>
<host>example01-1</host>
<priority>1</priority>
</replica>
<replica>
<host>example01-2</host>
<priority>2</priority>
</replica>
<db>db_name</db>
<table>table_name</table>
<where>id=10</where>
<invalidate_query>SQL_QUERY</invalidate_query>
</postgresql></source>
Таблица в PostgreSQL:
postgres=# CREATE TABLE "public"."test" (
"int_id" SERIAL,
"int_nullable" INT NULL DEFAULT NULL,
"float" FLOAT NOT NULL,
"str" VARCHAR(100) NOT NULL DEFAULT '',
"float_nullable" FLOAT NULL DEFAULT NULL,
PRIMARY KEY (int_id));
CREATE TABLE
postgres=# INSERT INTO test (int_id, str, "float") VALUES (1,'test',2);
INSERT 0 1
postgresql> SELECT * FROM test;
int_id | int_nullable | float | str | float_nullable
--------+--------------+-------+------+----------------
1 | | 2 | test |
(1 row)
Таблица в RT.WideStore, получение данных из PostgreSQL таблицы, созданной выше:
CREATE TABLE default.postgresql_table
(
`float_nullable` Nullable(Float32),
`str` String,
`int_id` Int32
)
ENGINE = PostgreSQL('localhost:5432', 'public', 'test', 'postges_user', 'postgres_password');
SELECT * FROM postgresql_table WHERE str IN ('test');
┌─float_nullable─┬─str──┬─int_id─┐
│ ᴺᵁᴸᴸ │ test │ 1 │
└─────────────────┴───────┴────────┘
Using Non-default Schema:
postgres=# CREATE SCHEMA "nice.schema";
postgres=# CREATE TABLE "nice.schema"."nice.table" (a integer);
postgres=# INSERT INTO "nice.schema"."nice.table" SELECT i FROM generate_series(0, 99) as t(i)
CREATE TABLE pg_table_schema_with_dots (a UInt32)
ENGINE PostgreSQL('localhost:5432', 'clickhouse', 'nice.table', 'postgrsql_user', 'password', 'nice.schema');
См. также:
postgresql,
Движок ExternalDistributed
позволяет выполнять запросы SELECT
для таблиц на удаленном сервере MySQL или PostgreSQL. Принимает в качестве аргумента табличные движки MySQL или PostgreSQL, поэтому возможно шардирование.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password');
Смотрите подробное описание запроса CREATE TABLE.
Структура таблицы может отличаться от структуры исходной таблицы:
Параметры движка:
engine
— табличный движок MySQL
или PostgreSQL
.host:port
— адрес сервера MySQL или PostgreSQL.database
— имя базы данных на сервере.table
— имя таблицы.user
— имя пользователя.password
— пароль пользователя.Поддерживает несколько реплик, которые должны быть перечислены через |
, а шарды — через ,
. Например:
CREATE TABLE test_shards (id UInt32, name String, age UInt32, money UInt32)
ENGINE = ExternalDistributed('MySQL', `mysql{1|2}:3306,mysql{3|4}:3306`, 'clickhouse', 'test_replicas', 'root', 'clickhouse');
При указании реплик для каждого из шардов при чтении выбирается одна из доступных реплик. Если соединиться не удалось, то выбирается следующая реплика, и так для всех реплик. Если попытка соединения не удалась для всех реплик, то сервер RT.WideStore снова пытается соединиться с одной из реплик, перебирая их по кругу, и так несколько раз.
Вы можете указать любое количество шардов и любое количество реплик для каждого шарда.
Смотрите также:
Создает таблицу RT.WideStoreс исходным дампом данных таблицы PostgreSQL и запускает процесс репликации, т.е. выполняется применение новых изменений в фоне, как эти изменения происходят в таблице PostgreSQL в удаленной базе данных PostgreSQL.
Если требуется более одной таблицы, вместо движка таблиц рекомендуется использовать движок баз данных MaterializedPostgreSQL и с помощью настройки materialized_postgresql_tables_list указывать таблицы, которые нужно реплицировать. Это будет намного лучше с точки зрения нагрузки на процессор, уменьшит количество подключений и количество слотов репликации внутри удаленной базы данных PostgreSQL.
CREATE TABLE postgresql_db.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres_user', 'postgres_password')
PRIMARY KEY key;
Параметры движка:
host:port
— адрес сервера PostgreSQL.database
— имя базы данных на удалённом сервере.table
— имя таблицы на удалённом сервере.user
— пользователь PostgreSQL.password
— пароль пользователя.Настройка wal_level должна иметь значение logical
, параметр max_replication_slots
должен быть равен по меньшей мере 2
в конфигурационном файле в PostgreSQL.
Таблица, созданная с помощью движка MaterializedPostgreSQL
, должна иметь первичный ключ — такой же, как replica identity index (по умолчанию: первичный ключ) таблицы PostgreSQL (смотрите replica identity index).
Допускается только база данных Atomic.
_version
— счетчик транзакций. Тип: UInt64._sign
— метка удаления. Тип: Int8. Возможные значения:1
— строка не удалена,-1
— строка удалена.Эти столбцы не нужно добавлять при создании таблицы. Они всегда доступны в SELECT
запросе. Столбец _version
равен позиции LSN
в WAL
, поэтому его можно использовать для проверки актуальности репликации.
CREATE TABLE postgresql_db.postgresql_replica (key UInt64, value UInt64)
ENGINE = MaterializedPostgreSQL('postgres1:5432', 'postgres_database', 'postgresql_replica', 'postgres_user', 'postgres_password')
PRIMARY KEY key;
SELECT key, value, _version FROM postgresql_db.postgresql_replica;
ПРЕДУПРЕЖДЕНИЕ:
Репликация **TOAST**-значений не поддерживается. Для типа данных будет использоваться значение по умолчанию.
Движок работает с RabbitMQ.
RabbitMQ
позволяет:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = RabbitMQ SETTINGS
rabbitmq_host_port = 'host:port',
rabbitmq_exchange_name = 'exchange_name',
rabbitmq_format = 'data_format'[,]
[rabbitmq_exchange_type = 'exchange_type',]
[rabbitmq_routing_key_list = 'key1,key2,...',]
[rabbitmq_row_delimiter = 'delimiter_symbol',]
[rabbitmq_schema = '',]
[rabbitmq_num_consumers = N,]
[rabbitmq_num_queues = N,]
[rabbitmq_queue_base = 'queue',]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N]
[rabbitmq_queue_settings_list = 'x-dead-letter-exchange=my-dlx,x-max-length=10,x-overflow=reject-publish']
Обязательные параметры:
rabbitmq_host_port
– адрес сервера (хост:порт
). Например: localhost:5672
.rabbitmq_exchange_name
– имя точки обмена в RabbitMQ.rabbitmq_format
– формат сообщения. Используется такое же обозначение, как и в функции FORMAT
в SQL, например, JSONEachRow
. Подробнее см. в разделе Форматы входных и выходных данных.Дополнительные параметры:
rabbitmq_exchange_type
– тип точки обмена в RabbitMQ: direct
, fanout
, topic
, headers
, consistent_hash
. По умолчанию: fanout
.rabbitmq_routing_key_list
– список ключей маршрутизации, через запятую.rabbitmq_row_delimiter
– символ-разделитель, который завершает сообщение.rabbitmq_schema
– опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, Cap’n Proto требует путь к файлу со схемой и название корневого объекта schema.capnp:Message
.rabbitmq_num_consumers
– количество потребителей на таблицу. По умолчанию: 1
. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.rabbitmq_num_queues
– количество очередей. По умолчанию: 1
. Большее число очередей может сильно увеличить пропускную способность.rabbitmq_queue_base
- настройка для имен очередей. Сценарии использования описаны ниже.rabbitmq_persistent
- флаг, от которого зависит настройка 'durable' для сообщений при запросах INSERT
. По умолчанию: 0
.rabbitmq_skip_broken_messages
– максимальное количество некорректных сообщений в блоке. Если rabbitmq_skip_broken_messages = N
, то движок отбрасывает N
сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию – 0.rabbitmq_max_block_size
rabbitmq_flush_interval_ms
rabbitmq_queue_settings_list
- позволяет самостоятельно установить настройки RabbitMQ при создании очереди. Доступные настройки: x-max-length
, x-max-length-bytes
, x-message-ttl
, x-expires
, x-priority
, x-max-priority
, x-overflow
, x-dead-letter-exchange
, x-queue-type
. Настройка durable
для очереди ставится автоматически.Настройки форматов данных также могут быть добавлены в списке RabbitMQ настроек.
Пример:
CREATE TABLE queue (
key UInt64,
value UInt64,
date DateTime
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5,
date_time_input_format = 'best_effort';
Конфигурация сервера RabbitMQ добавляется с помощью конфигурационного файла RT.WideStore.
Требуемая конфигурация:
<rabbitmq>
<username>root</username>
<password>clickhouse</password>
</rabbitmq>
Дополнительная конфигурация:
<rabbitmq>
<vhost>clickhouse</vhost>
</rabbitmq>
Запрос SELECT
не очень полезен для чтения сообщений (за исключением отладки), поскольку каждое сообщение может быть прочитано только один раз. Практичнее создавать потоки реального времени с помощью материализованных преставлений. Для этого:
Когда к движку присоединяется материализованное представление, оно начинает в фоновом режиме собирать данные. Это позволяет непрерывно получать сообщения от RabbitMQ и преобразовывать их в необходимый формат с помощью SELECT
. У одной таблицы RabbitMQ может быть неограниченное количество материализованных представлений.
Данные передаются с помощью параметров rabbitmq_exchange_type
и rabbitmq_routing_key_list
. Может быть не более одной точки обмена на таблицу. Одна точка обмена может использоваться несколькими таблицами: это позволяет выполнять маршрутизацию по нескольким таблицам одновременно.
Параметры точек обмена:
direct
- маршрутизация основана на точном совпадении ключей. Пример списка ключей: key1,key2,key3,key4,key5
. Ключ сообщения может совпадать с одним из них.fanout
- маршрутизация по всем таблицам, где имя точки обмена совпадает, независимо от ключей.topic
- маршрутизация основана на правилах с ключами, разделенными точками. Например: *.logs
, records.*.*.2020
, *.2018,*.2019,*.2020
.headers
- маршрутизация основана на совпадении key=value
с настройкой x-match=all
или x-match=any
. Пример списка ключей таблицы: x-match=all,format=logs,type=report,year=2020
.consistent_hash
- данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ: rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
.Настройка rabbitmq_queue_base
может быть использована в следующих случаях:
rabbitmq_queue_base
настройку и не указывать настройки rabbitmq_num_consumers
и rabbitmq_num_queues
. Чтобы восстановить чтение из всех очередей, которые были созданы для конкретной таблицы, необходимо совпадение следующих настроек: rabbitmq_queue_base
, rabbitmq_num_consumers
, rabbitmq_num_queues
. По умолчанию, если настройка rabbitmq_queue_base
не указана, будут использованы уникальные для каждой таблицы имена очередей.rabbitmq_num_consumers
, rabbitmq_num_queues
.durable
настройкой очереди, так как они не удаляются автоматически (но могут быть удалены с помощью любого RabbitMQ CLI).Для улучшения производительности полученные сообщения группируются в блоки размера max_insert_block_size. Если блок не удалось сформировать за stream_flush_interval_ms миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
Если параметрыrabbitmq_num_consumers
и/или rabbitmq_num_queues
заданы вместе с параметром rabbitmq_exchange_type
:
rabbitmq-consistent-hash-exchange
должен быть включен.message_id
должно быть определено (уникальное для каждого сообщения/пакета).При запросах INSERT
отправляемым сообщениям добавляются метаданные: messageID
и флаг republished
- доступны через заголовки сообщений (headers). Для запросов чтения и вставки не должна использоваться одна и та же таблица.
Пример:
CREATE TABLE queue (
key UInt64,
value UInt64
) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'localhost:5672',
rabbitmq_exchange_name = 'exchange1',
rabbitmq_exchange_type = 'headers',
rabbitmq_routing_key_list = 'format=logs,type=report,year=2020',
rabbitmq_format = 'JSONEachRow',
rabbitmq_num_consumers = 5; CREATE TABLE daily (key UInt64, value UInt64)
ENGINE = MergeTree();
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT key, value FROM queue;
SELECT key, value FROM daily ORDER BY key;
_exchange_name
- имя точки обмена RabbitMQ._channel_id
- идентификатор канала ChannelID
, на котором было получено сообщение._delivery_tag
- значение DeliveryTag
полученного сообщения. Уникально в рамках одного канала._redelivered
- флаг redelivered
. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)_message_id
- значение поля messageID
полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения._timestamp
- значение поля timestamp
полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.Виртуальный столбец — это неотъемлемый атрибут движка таблиц, определенный в исходном коде движка.
Виртуальные столбцы не надо указывать в запросе CREATE TABLE
и они не отображаются в результатах запросов SHOW CREATE TABLE
и DESCRIBE TABLE
. Также виртуальные столбцы доступны только для чтения, поэтому вы не можете вставлять в них данные.
Чтобы получить данные из виртуального столбца, необходимо указать его название в запросе SELECT
. SELECT *
не отображает данные из виртуальных столбцов.
При создании таблицы со столбцом, имя которого совпадает с именем одного из виртуальных столбцов таблицы, виртуальный столбец становится недоступным. Не делайте так. Чтобы помочь избежать конфликтов, имена виртуальных столбцов обычно предваряются подчеркиванием.
Существует три основные категории движков таблиц:
Остальные движки таблиц уникальны по своему назначению и еще не сгруппированы в семейства, поэтому они помещены в эту специальную категорию:
Движок Distributed не хранит данные самостоятельно, а позволяет обрабатывать запросы распределённо, на нескольких серверах. Чтение автоматически распараллеливается. При чтении будут использованы индексы таблиц на удалённых серверах, если есть.
Движок Distributed принимает параметры:
имя кластера в конфигурационном файле сервера
имя удалённой базы данных
имя удалённой таблицы
(не обязательно) ключ шардирования.
(не обязательно) имя политики, оно будет использоваться для хранения временных файлов для асинхронной отправки
Смотрите также:
insert_distributed_sync
Пример:
Distributed(logs, default, hits[, sharding_key[, policy_name]])
данные будут читаться со всех серверов кластера logs, из таблицы default.hits, расположенной на каждом сервере кластера. Данные не только читаются, но и частично (настолько, насколько это возможно) обрабатываются на удалённых серверах. Например, при запросе с GROUP BY, данные будут агрегированы на удалённых серверах, промежуточные состояния агрегатных функций будут отправлены на запросивший сервер; затем данные будут доагрегированы.
Вместо имени базы данных может использоваться константное выражение, возвращающее строку. Например, currentDatabase().
logs - имя кластера в конфигурационном файле сервера.
Кластеры задаются следующим образом:
<remote_servers>
<logs>
<shard>
<!-- Не обязательно. Вес шарда при записи данных. По умолчанию, 1. -->
<weight>1</weight>
<!-- Не обязательно. Записывать ли данные только на одну, любую из реплик. По умолчанию, false - записывать данные на все реплики. -->
<internal_replication>false</internal_replication>
<replica>
<!-- Не обязательно. Приоритет реплики для балансировки нагрузки (смотрите также настройку load_balancing). По умолчанию : 1 (меньшее значение - больший приоритет). -->
<priority>1</priority>
<host>example01-01-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-01-2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<weight>2</weight>
<internal_replication>false</internal_replication>
<replica>
<host>example01-02-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-02-2</host>
<port>9000</port>
</replica>
</shard>
</logs></remote_servers>
Здесь задан кластер с именем logs, состоящий из двух шардов, каждый из которых состоит из двух реплик. Шардами называются серверы, содержащие разные части данных (чтобы прочитать все данные, нужно идти на все шарды). Репликами называются дублирующие серверы (чтобы прочитать данные, можно идти за данными на любую из реплик).
Имя кластера не должно содержать точки.
В качестве параметров для каждого сервера указываются host
, port
и, не обязательно, user
, password
, secure
, compression
:
host
- адрес удалённого сервера. Может быть указан домен, или IPv4 или IPv6 адрес. В случае указания домена, при старте сервера делается DNS запрос, и результат запоминается на всё время работы сервера. Если DNS запрос неуспешен, то сервер не запускается. Если вы изменяете DNS-запись, перезапустите сервер.port
- TCP-порт для межсерверного взаимодействия (в конфиге - tcp_port, обычно 9000). Не перепутайте с http_port.user
- имя пользователя для соединения с удалённым сервером. по умолчанию - default. Этот пользователь должен иметь доступ для соединения с указанным сервером. Доступы настраиваются в файле users.xml, подробнее смотрите в разделе Права доступа.password
- пароль для соединения с удалённым сервером, в открытом виде. по умолчанию - пустая строка.secure
- Использовать шифрованное соединение ssl, Обычно используется с портом port
= 9440. Сервер должен слушать порт <tcp_port_secure>9440</tcp_port_secure>
с корректными настройками сертификатов.compression
- Использовать сжатие данных. По умолчанию: true.При указании реплик, для каждого из шардов, при чтении, будет выбрана одна из доступных реплик. Можно настроить алгоритм балансировки нагрузки (то есть, предпочтения, на какую из реплик идти) - см. настройку load_balancing. Если соединение с сервером не установлено, то будет произведена попытка соединения с небольшим таймаутом. Если соединиться не удалось, то будет выбрана следующая реплика, и так для всех реплик. Если попытка соединения для всех реплик не удалась, то будут снова произведены попытки соединения по кругу, и так несколько раз. Это работает в пользу отказоустойчивости, хотя и не обеспечивает полную отказоустойчивость: удалённый сервер может принять соединение, но не работать, или плохо работать.
Можно указать от одного шарда (в таком случае, обработку запроса стоит называть удалённой, а не распределённой) до произвольного количества шардов. В каждом шарде можно указать от одной до произвольного числа реплик. Можно указать разное число реплик для каждого шарда.
Вы можете прописать сколько угодно кластеров в конфигурации.
Для просмотра имеющихся кластеров, вы можете использовать системную таблицу system.clusters.
Движок Distributed позволяет работать с кластером, как с локальным сервером. При этом, кластер является неэластичным: вы должны прописать его конфигурацию в конфигурационный файл сервера (лучше всех серверов кластера).
Как видно, движок Distributed требует прописывания кластера в конфигурационный файл; кластера из конфигурационного файла обновляются налету, без перезапуска сервера. Если вам необходимо каждый раз отправлять запрос на неизвестный набор шардов и реплик, вы можете не создавать Distributed таблицу, а воспользоваться табличной функцией remote. Смотрите раздел Табличные функции.
Есть два способа записывать данные на кластер:
Во-первых, вы можете самостоятельно определять, на какие серверы какие данные записывать, и выполнять запись непосредственно на каждый шард. То есть, делать INSERT в те таблицы, на которые «смотрит» распределённая таблица. Это наиболее гибкое решение поскольку вы можете использовать любую схему шардирования, которая может быть нетривиальной из-за требований предметной области. Также это является наиболее оптимальным решением, так как данные могут записываться на разные шарды полностью независимо.
Во-вторых, вы можете делать INSERT в Distributed таблицу. В этом случае, таблица будет сама распределять вставляемые данные по серверам. Для того, чтобы писать в Distributed таблицу, у неё должен быть задан ключ шардирования (последний параметр). Также, если шард всего-лишь один, то запись работает и без указания ключа шардирования (так как в этом случае он не имеет смысла).
У каждого шарда в конфигурационном файле может быть задан «вес» (weight). По умолчанию, вес равен единице. Данные будут распределяться по шардам в количестве, пропорциональном весу шарда. Например, если есть два шарда, и у первого выставлен вес 9, а у второго 10, то на первый будет отправляться 9 / 19 доля строк, а на второй - 10 / 19.
У каждого шарда в конфигурационном файле может быть указан параметр internal_replication.
Если он выставлен в true, то для записи будет выбираться первая живая реплика и данные будут писаться на неё. Этот вариант следует использовать, если Distributed таблица «смотрит» на реплицируемые таблицы. То есть, если таблица, в которую будут записаны данные, будет сама заниматься их репликацией.
Если он выставлен в false (по умолчанию), то данные будут записываться на все реплики. По сути, это означает, что Distributed таблица занимается репликацией данных самостоятельно. Это хуже, чем использование реплицируемых таблиц, так как не контролируется консистентность реплик, и они со временем будут содержать немного разные данные.
Для выбора шарда, на который отправляется строка данных, вычисляется выражение шардирования, и берётся его остаток от деления на суммарный вес шардов. Строка отправляется на шард, соответствующий полуинтервалу остатков от prev_weights до prev_weights + weight, где prev_weights - сумма весов шардов с меньшим номером, а weight - вес этого шарда. Например, если есть два шарда, и у первого выставлен вес 9, а у второго 10, то строка будет отправляться на первый шард для остатков из диапазона [0, 9), а на второй - для остатков из диапазона [9, 19).
Выражением шардирование может быть произвольное выражение от констант и столбцов таблицы, возвращающее целое число. Например, вы можете использовать выражение rand() для случайного распределения данных, или UserID - для распределения по остатку от деления идентификатора посетителя (тогда данные одного посетителя будут расположены на одном шарде, что упростит выполнение IN и JOIN по посетителям). Если распределение какого-либо столбца недостаточно равномерное, вы можете обернуть его в хэш функцию: intHash64(UserID).
Простой остаток от деления является довольно ограниченным решением для шардирования и подходит не для всех случаев. Он подходит для среднего и большого объёма данных (десятки серверов), но не для очень больших объёмов данных (сотни серверов и больше). В последнем случае, лучше использовать схему шардирования, продиктованную требованиями предметной области, и не использовать возможность записи в Distributed таблицы.
Запросы SELECT отправляются на все шарды, и работают независимо от того, каким образом данные распределены по шардам (они могут быть распределены полностью случайно). При добавлении нового шарда, можно не переносить на него старые данные, а записывать новые данные с большим весом - данные будут распределены слегка неравномерно, но запросы будут работать корректно и достаточно эффективно.
Беспокоиться о схеме шардирования имеет смысл в следующих случаях:
Запись данных осуществляется полностью асинхронно. При вставке в таблицу, блок данных сначала записывается в файловую систему. Затем, в фоновом режиме отправляются на удалённые серверы при первой возможности. Период отправки регулируется настройками distributed_directory_monitor_sleep_time_ms и distributed_directory_monitor_max_sleep_time_ms. Движок таблиц Distributed
отправляет каждый файл со вставленными данными отдельно, но можно включить пакетную отправку данных настройкой distributed_directory_monitor_batch_inserts. Эта настройка улучшает производительность кластера за счет более оптимального использования ресурсов сервера-отправителя и сети. Необходимо проверять, что данные отправлены успешно, для этого проверьте список файлов (данных, ожидающих отправки) в каталоге таблицы /var/lib/clickhouse/data/database/table/
. Количество потоков для выполнения фоновых задач можно задать с помощью настройки background_distributed_schedule_pool_size.
Если после INSERT-а в Distributed таблицу, сервер перестал существовать или был грубо перезапущен (например, в следствие аппаратного сбоя), то записанные данные могут быть потеряны. Если в директории таблицы обнаружен повреждённый кусок данных, то он переносится в поддиректорию broken и больше не используется.
При выставлении опции max_parallel_replicas выполнение запроса распараллеливается по всем репликам внутри одного шарда. Подробнее смотрите раздел max_parallel_replicas.
_shard_num
— содержит значение shard_num
из таблицы system.clusters
. Тип: UInt32.
Примечание:
Так как табличные функции remote и cluster создают временную таблицу на движке `Distributed`, то в ней также доступен столбец `_shard_num`.
См. также:
общее описание виртуальных столбцов
настройка background_distributed_schedule_pool_size
функции shardNum() и shardCount()
Движок Dictionary
отображает данные словаря как таблицу RT.WideStore.
Рассмотрим для примера словарь products
со следующей конфигурацией:
<dictionaries>
<dictionary>
<name>products</name>
<source>
<odbc>
<table>products</table>
<connection_string>DSN=some-db-server</connection_string>
</odbc>
</source>
<lifetime>
<min>300</min>
<max>360</max>
</lifetime>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>product_id</name>
</id>
<attribute>
<name>title</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
</dictionary>
</dictionaries>
Запрос данных словаря:
SELECT
name,
type,
key,
attribute.names,
attribute.types,
bytes_allocated,
element_count,
sourceFROM system.dictionaries
WHERE name = 'products'
┌─name─────┬─type─┬─key─────┬─attribute.names─┬─attribute.types─┬─bytes_allocated─┬─element_count──┬─source───────────┐
│ products │ Flat │ UInt64 │ ['title'] │ ['String'] │ 23065376 │ 175032 │ ODBC: .products │
└───────────┴──────┴─────────┴───────────────────┴──────────────────┴───────────────────┴────────────────┴───────────────────┘
В таком виде данные из словаря можно получить при помощи функций dictGet*.
Такое представление неудобно, когда нам необходимо получить данные в чистом виде, а также при выполнении операции JOIN
. Для этих случаев можно использовать движок Dictionary
, который отобразит данные словаря в таблицу.
Синтаксис:
CREATE TABLE %table_name% (%fields%) engine = Dictionary(%dictionary_name%)`
Пример использования:
create table products (product_id UInt64, title String) Engine = Dictionary(products);
Проверим что у нас в таблице?
select * from products limit 1;
┌────product_id─┬─title────────────┐
│ 152689 │ Some item │
└────────────────┴───────────────────┘
Смотрите также:
Движок Merge
(не путайте с движком MergeTree
) не хранит данные самостоятельно, а позволяет читать одновременно из произвольного количества других таблиц. Чтение автоматически распараллеливается. Запись в таблицу не поддерживается. При чтении будут использованы индексы тех таблиц, из которых реально идёт чтение, если они существуют.
CREATE TABLE ... Engine=Merge(db_name, tables_regexp)
Параметры движка:
db_name
— Возможные варианты:
currentDatabase()
,REGEXP(expression)
, где expression
— регулярное выражение для отбора БД.tables_regexp
— регулярное выражение для имен таблиц в указанной БД или нескольких БД.
Регулярные выражения — re2 (поддерживает подмножество PCRE), регистрозависимые. Смотрите замечание об экранировании в регулярных выражениях в разделе «match».
При выборе таблиц для чтения сама Merge
-таблица не будет выбрана, даже если попадает под регулярное выражение, чтобы не возникло циклов. Впрочем, вы можете создать две Merge
-таблицы, которые будут пытаться бесконечно читать данные друг друга, но делать этого не рекомендуется.
Типичный способ использования движка Merge
— работа с большим количеством таблиц типа TinyLog
как с одной.
Пример 1:
Пусть есть две БД ABC_corporate_site
и ABC_store
. Таблица all_visitors
будет содержать ID из таблиц visitors
в обеих БД.
CREATE TABLE all_visitors (id UInt32) ENGINE=Merge(REGEXP('ABC_*'), 'visitors');
Пример 2:
Пусть есть старая таблица WatchLog_old
. Необходимо изменить партиционирование без перемещения данных в новую таблицу WatchLog_new
. При этом в выборке должны участвовать данные обеих таблиц.
CREATE TABLE WatchLog_old(date Date, UserId Int64, EventType String, Cnt UInt64)
ENGINE=MergeTree(date, (UserId, EventType), 8192);
INSERT INTO WatchLog_old VALUES ('2018-01-01', 1, 'hit', 3);
CREATE TABLE WatchLog_new(date Date, UserId Int64, EventType String, Cnt UInt64)
ENGINE=MergeTree PARTITION BY date ORDER BY (UserId, EventType) SETTINGS index_granularity=8192;
INSERT INTO WatchLog_new VALUES ('2018-01-02', 2, 'hit', 3);
CREATE TABLE WatchLog as WatchLog_old ENGINE=Merge(currentDatabase(), '^WatchLog');
SELECT * FROM WatchLog;
┌────────date─┬─UserId─┬─EventType─┬─Cnt─┐
│ 2018-01-01 │ 1 │ hit │ 3 │
└─────────────┴─────────┴────────────┴─────┘
┌────────date─┬─UserId─┬─EventType─┬─Cnt─┐
│ 2018-01-02 │ 2 │ hit │ 3 │
└─────────────┴─────────┴────────────┴─────┘
_table
— содержит имя таблицы, из которой данные были прочитаны. Тип — String.
В секции `WHERE/PREWHERE` можно установить константное условие на столбец `_table` (например, `WHERE _table='xyz'`). В этом случае операции чтения выполняются только для тех таблиц, для которых выполняется условие на значение `_table`, таким образом, столбец `_table` работает как индекс.
См. также:
Управляет данными в одном файле на диске в указанном формате.
Примеры применения:
File(Format)
Format
должен быть таким, который RT.WideStore может использовать и в запросах INSERT
и в запросах SELECT
. Полный список поддерживаемых форматов смотрите в разделе Форматы.
Сервер RT.WideStore не позволяет указать путь к файлу, с которым будет работать File
. Используется путь к хранилищу, определенный параметром path в конфигурации сервера.
При создании таблицы с помощью File(Format)
сервер RT.WideStore создает в хранилище каталог с именем таблицы, а после добавления в таблицу данных помещает туда файл data.Format
.
Можно вручную создать в хранилище каталог таблицы, поместить туда файл, затем на сервере RT.WideStore добавить (ATTACH) информацию о таблице, соответствующей имени каталога и прочитать из файла данные.
Предупреждение:
Будьте аккуратны с этой функциональностью, поскольку сервер RT.WideStore не отслеживает внешние изменения данных. Если в файл будет производиться запись одновременно со стороны сервера RT.WideStore и с внешней стороны, то результат непредсказуем.
Пример:
1. Создадим на сервере таблицу file_engine_table
:
CREATE TABLE file_engine_table (name String, value UInt32) ENGINE=File(TabSeparated)
В конфигурации по умолчанию сервер RT.WideStore создаст каталог /var/lib/clickhouse/data/default/file_engine_table
.
2. Вручную создадим файл /var/lib/clickhouse/data/default/file_engine_table/data.TabSeparated
с содержимым:
$cat data.TabSeparated
one 1
two 2
3. Запросим данные:
SELECT * FROM file_engine_table
┌─name─┬─value─┐
│ one │ 1 │
│ two │ 2 │
└──────┴────────┘
В clickhouse-local движок в качестве параметра принимает не только формат, но и путь к файлу. В том числе можно указать стандартные потоки ввода/вывода цифровым или буквенным обозначением 0
или stdin
, 1
или stdout
. Можно записывать и читать сжатые файлы. Для этого нужно задать дополнительный параметр движка или расширение файла (gz
, br
или xz
).
Пример:
$ echo -e "1,2\n3,4" | clickhouse-local -q "CREATE TABLE table (a Int64, b Int64) ENGINE = File(CSV, stdin); SELECT a, b FROM table; DROP TABLE table"
Детали реализации:
SELECT
, запросы INSERT
могут выполняться только последовательно.INSERT
.INSERT
записывает в конец файла.ALTER
и SELECT...SAMPLE
;При записи в таблицу типа Null, данные игнорируются. При чтении из таблицы типа Null, возвращается пустота.
Тем не менее, есть возможность создать материализованное представление над таблицей типа Null. Тогда данные, записываемые в таблицу, будут попадать в представление.
Представляет собой множество, постоянно находящееся в оперативке. Предназначено для использования в правой части оператора IN (смотрите раздел «Операторы IN»).
В таблицу можно вставлять данные INSERT-ом - будут добавлены новые элементы в множество, с игнорированием дубликатов. Но из таблицы нельзя, непосредственно, делать SELECT. Единственная возможность чтения - использование в правой части оператора IN.
Данные постоянно находятся в оперативке. При INSERT-е, в директорию таблицы на диске, также пишутся блоки вставленных данных. При запуске сервера, эти данные считываются в оперативку. То есть, после перезапуска, данные остаются на месте.
При грубом перезапуске сервера, блок данных на диске может быть потерян или повреждён. В последнем случае, может потребоваться вручную удалить файл с повреждёнными данными.
При создании таблицы, применяются следующие параметры:
Подготовленная структура данных для использования в операциях JOIN.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
) ENGINE = Join(join_strictness, join_type, k1[, k2, ...])
Смотрите подробное описание запроса CREATE TABLE.
Параметры движка:
join_strictness
– строгость JOIN.join_type
– тип JOIN.k1[, k2, ...]
– ключевые столбцы секции USING
с которыми выполняется операция JOIN
.Вводите параметры join_strictness
и join_type
без кавычек, например, Join(ANY, LEFT, col1)
. Они должны быть такими же как и в той операции JOIN
, в которой таблица будет использоваться. Если параметры не совпадают, RT.WideStore не генерирует исключение и может возвращать неверные данные.
Данные таблиц Join
всегда находятся в оперативной памяти. При вставке строк в таблицу RT.WideStore записывает блоки данных в каталог на диске, чтобы их можно было восстановить при перезапуске сервера.
При аварийном перезапуске сервера блок данных на диске может быть потерян или повреждён. В последнем случае может потребоваться вручную удалить файл с повреждёнными данными.
Для добавления данных в таблицы с движком Join
используйте запрос INSERT
. Если таблица создавалась со строгостью ANY
, то данные с повторяющимися ключами игнорируются. Если задавалась строгость ALL
, то добавляются все строки.
Основные применения Join
таблиц:
JOIN
.Запросы ALTER DELETE
для таблиц с движком Join
выполняются как мутации. При выполнении мутации DELETE
считываются отфильтрованные данные и перезаписываются в оперативную память и на диск.
При создании таблицы применяются следующие настройки:
Таблицы с движком Join
нельзя использовать в операциях GLOBAL JOIN
.
Движок Join
позволяет использовать настройку join_use_nulls в запросе CREATE TABLE
. Необходимо использовать одно и то же значение параметра join_use_nulls
в запросах CRATE TABLE
и SELECT
.
Создание левой таблицы:
CREATE TABLE id_val(`id` UInt32, `val` UInt32) ENGINE = TinyLog;
INSERT INTO id_val VALUES (1,11)(2,12)(3,13);
Создание правой таблицы с движком Join
:
CREATE TABLE id_val_join(`id` UInt32, `val` UInt8) ENGINE = Join(ANY, LEFT, id);
INSERT INTO id_val_join VALUES (1,21)(1,22)(3,23);
Объединение таблиц:
SELECT * FROM id_val ANY LEFT JOIN id_val_join USING (id);
┌─id─┬─val─┬─id_val_join.val─┐
│ 1 │ 11 │ 21 │
│ 2 │ 12 │ 0 │
│ 3 │ 13 │ 23 │
└────┴─────┴───────────────────┘
В качестве альтернативы, можно извлечь данные из таблицы Join
, указав значение ключа объединения:
SELECT joinGet('id_val_join', 'val', toUInt32(1));
┌─joinGet('id_val_join', 'val', toUInt32(1))─┐
│ 21 │
└────────────────────────────────────────────────┘
Удаление данных из таблицы Join
:
ALTER TABLE id_val_join DELETE WHERE id = 3;
┌─id─┬─val─┐
│ 1 │ 21 │
└────┴──────┘
Управляет данными на удаленном HTTP/HTTPS сервере. Данный движок похож на движок File.
Format
должен быть таким, который RT.WideStore может использовать в запросах SELECT
и, если есть необходимость, INSERT
. Полный список поддерживаемых форматов смотрите в разделе Форматы.
URL
должен соответствовать структуре Uniform Resource Locator. По указанному URL должен находится сервер работающий по протоколу HTTP или HTTPS. При этом не должно требоваться никаких дополнительных заголовков для получения ответа от сервера.
Запросы INSERT
и SELECT
транслируются в POST
и GET
запросы соответственно. Для обработки POST
-запросов удаленный сервер должен поддерживать Chunked transfer encoding.
Максимальное количество переходов по редиректам при выполнении HTTP-запроса методом GET можно ограничить с помощью настройки max_http_get_redirects.
Пример:
1. Создадим на сервере таблицу url_engine_table
:
CREATE TABLE url_engine_table (word String, value UInt64)
ENGINE=URL('http://127.0.0.1:12345/', CSV)
2. Создадим простейший http-сервер стандартными средствами языка python3 и запустим его:
from http.server import BaseHTTPRequestHandler, HTTPServer
class CSVHTTPServer(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/csv')
self.end_headers()
self.wfile.write(bytes('Hello,1\nWorld,2\n', "utf-8"))
if __name__ == "__main__":
server_address = ('127.0.0.1', 12345)
HTTPServer(server_address, CSVHTTPServer).serve_forever()
$ python3 server.py
3. Запросим данные:
SELECT * FROM url_engine_table
┌─word──┬─value─┐
│ Hello │ 1 │
│ World │ 2 │
└───────┴────────┘
ALTER
и SELECT...SAMPLE
;Используется для реализации представлений (подробнее см. запрос CREATE VIEW
). Не хранит данные, а хранит только указанный запрос SELECT
. При чтении из таблицы, выполняет его (с удалением из запроса всех ненужных столбцов).
Используется для реализации материализованных представлений (подробнее см. запрос CREATE VIEW). Для хранения данных, использует другой движок, который был указан при создании представления. При чтении из таблицы, просто использует этот движок.
Хранит данные в оперативке, в несжатом виде. Данные хранятся именно в таком виде, в каком они получаются при чтении. То есть, само чтение из этой таблицы полностью бесплатно. Конкурентный доступ к данным синхронизируется. Блокировки короткие: чтения и записи не блокируют друг друга. Индексы не поддерживаются. Чтение распараллеливается. За счёт отсутствия чтения с диска, разжатия и десериализации данных удаётся достичь максимальной производительности (выше 10 ГБ/сек.) на простых запросах. (Стоит заметить, что во многих случаях, производительность движка MergeTree, почти такая же высокая.) При перезапуске сервера данные из таблицы исчезают и таблица становится пустой. Обычно, использование этого движка таблиц является неоправданным. Тем не менее, он может использоваться для тестов, а также в задачах, где важно достичь максимальной скорости на не очень большом количестве строк (примерно до 100 000 000).
Движок Memory используется системой для временных таблиц - внешних данных запроса (смотрите раздел «Внешние данные для обработки запроса»), для реализации GLOBAL IN
(смотрите раздел «Операторы IN»).
Буферизует записываемые данные в оперативке, периодически сбрасывая их в другую таблицу. При чтении, производится чтение данных одновременно из буфера и из другой таблицы.
Buffer(database, table, num_layers, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
Параметры движка:
database
— имя базы данных. Вместо имени базы данных может использоваться константное выражение, возвращающее строку. table
— таблица, в которую сбрасывать данные. num_layers
— уровень параллелизма. Физически таблица будет представлена в виде num_layers
независимых буферов. Рекомендуемое значение — 16. min_time
, max_time
, min_rows
, max_rows
, min_bytes
, max_bytes
— условия для сброса данных из буфера.
Данные сбрасываются из буфера и записываются в таблицу назначения, если выполнены все min
-условия или хотя бы одно max
-условие.
min_time
, max_time
— условие на время в секундах от момента первой записи в буфер.min_rows
, max_rows
— условие на количество строк в буфере.min_bytes
, max_bytes
— условие на количество байт в буфере.При записи, данные вставляются в случайный из num_layers
буферов. Или, если размер куска вставляемых данных достаточно большой (больше max_rows
или max_bytes
), то он записывается в таблицу назначения минуя буфер.
Условия для сброса данных учитываются отдельно для каждого из num_layers
буферов. Например, если num_layers = 16
и max_bytes = 100000000
, то максимальный расход оперативки будет 1.6 GB.
Пример:
CREATE TABLE merge.hits_buffer AS merge.hits ENGINE = Buffer(merge, hits, 16, 10, 100, 10000, 1000000, 10000000, 100000000)
Создаём таблицу merge.hits_buffer такой же структуры как merge.hits и движком Buffer. При записи в эту таблицу, данные буферизуются в оперативке и, в дальнейшем, записываются в таблицу merge.hits. Создаётся 16 буферов. Данные, имеющиеся в каждом из них будут сбрасываться, если прошло сто секунд, или записан миллион строк, или записано сто мегабайт данных; или если одновременно прошло десять секунд и записано десять тысяч строк и записано десять мегабайт данных. Для примера, если записана всего лишь одна строка, то через сто секунд она будет сброшена в любом случае. А если записано много строк, то они будут сброшены раньше.
При остановке сервера, при DROP TABLE или DETACH TABLE, данные из буфера тоже сбрасываются в таблицу назначения.
В качестве имени базы данных и имени таблицы можно указать пустые строки в одинарных кавычках. Это обозначает отсутствие таблицы назначения. В таком случае, при достижении условий на сброс данных, буфер будет просто очищаться. Это может быть полезным, чтобы хранить в оперативке некоторое окно данных.
При чтении из таблицы типа Buffer, будут обработаны данные, как находящиеся в буфере, так и данные из таблицы назначения (если такая есть). Но следует иметь ввиду, что таблица Buffer не поддерживает индекс. То есть, данные в буфере будут просканированы полностью, что может быть медленно для буферов большого размера. (Для данных в подчинённой таблице, будет использоваться тот индекс, который она поддерживает.)
Если множество столбцов таблицы Buffer не совпадает с множеством столбцов подчинённой таблицы, то будут вставлено подмножество столбцов, которое присутствует в обеих таблицах.
Если у одного из столбцов таблицы Buffer и подчинённой таблицы не совпадает тип, то в лог сервера будет записано сообщение об ошибке и буфер будет очищен. То же самое происходит, если подчинённая таблица не существует в момент сброса буфера.
Внимание:
В релизах до 26 октября 2021 года выполнение ALTER на таблице Buffer ломает структуру блоков и вызывает ошибку (см. [#15117] и [#30565], поэтому удаление буфера и его пересоздание — единственный вариант миграции для данного движка. Перед выполнением ALTER на таблице Buffer убедитесь, что в вашей версии эта ошибка устранена.
При нештатном перезапуске сервера, данные, находящиеся в буфере, будут потеряны.
Для таблиц типа Buffer неправильно работают FINAL и SAMPLE. Эти условия пробрасываются в таблицу назначения, но не используются для обработки данных в буфере. В связи с этим, рекомендуется использовать таблицу типа Buffer только для записи, а читать из таблицы назначения.
При добавлении данных в Buffer, один из буферов блокируется. Это приводит к задержкам, если одновременно делается чтение из таблицы.
Данные, вставляемые в таблицу Buffer, попадают в подчинённую таблицу в порядке, возможно отличающимся от порядка вставки, и блоками, возможно отличающимися от вставленных блоков. В связи с этим, трудно корректно использовать таблицу типа Buffer для записи в CollapsingMergeTree. Чтобы избежать проблемы, можно выставить num_layers в 1.
Если таблица назначения является реплицируемой, то при записи в таблицу Buffer будут потеряны некоторые ожидаемые свойства реплицируемых таблиц. Из-за произвольного изменения порядка строк и размеров блоков данных, перестаёт работать дедупликация данных, в результате чего исчезает возможность надёжной exactly once записи в реплицируемые таблицы.
В связи с этими недостатками, таблицы типа Buffer можно рекомендовать к применению лишь в очень редких случаях.
Таблицы типа Buffer используются в тех случаях, когда от большого количества серверов поступает слишком много INSERT-ов в единицу времени, и нет возможности заранее самостоятельно буферизовать данные перед вставкой, в результате чего, INSERT-ы не успевают выполняться.
Заметим, что даже для таблиц типа Buffer не имеет смысла вставлять данные по одной строке, так как таким образом будет достигнута скорость всего лишь в несколько тысяч строк в секунду, тогда как при вставке более крупными блоками, достижимо более миллиона строк в секунду (смотрите раздел «Производительность»).
RT.WideStore позволяет отправить на сервер данные, необходимые для обработки одного запроса, вместе с запросом SELECT. Такие данные будут положены во временную таблицу (см. раздел «Временные таблицы») и смогут использоваться в запросе (например, в операторах IN).
Для примера, если у вас есть текстовый файл с важными идентификаторами посетителей, вы можете загрузить его на сервер вместе с запросом, в котором используется фильтрация по этому списку.
Если вам нужно будет выполнить более одного запроса с достаточно большими внешними данными - лучше не использовать эту функциональность, а загрузить данные в БД заранее.
Внешние данные могут быть загружены как с помощью клиента командной строки (в не интерактивном режиме), так и через HTTP-интерфейс.
В клиенте командной строки, может быть указана секция параметров вида
--external --file=... [--name=...] [--format=...] [--types=...|--structure=...]
Таких секций может быть несколько - по числу передаваемых таблиц.
-
, что обозначает stdin
. Из stdin
может быть считана только одна таблица.Следующие параметры не обязательные:
Должен быть указан один из следующих параметров:
UInt64,String
. Столбцы будут названы _1, _2, …UserID UInt64
, URL String
. Определяет имена и типы столбцов.Файлы, указанные в file, будут разобраны форматом, указанным в format, с использованием типов данных, указанных в types или structure. Таблица будет загружена на сервер, и доступна там в качестве временной таблицы с именем name.
Примеры:
$ echo -ne "1\n2\n3\n" | clickhouse-client --query="SELECT count() FROM test.visits WHERE TraficSourceID IN _data" --external --file=- --types=Int8
849897
$ cat /etc/passwd | sed 's/:/\t/g' | clickhouse-client --query="SELECT shell, count() AS c FROM passwd GROUP BY shell ORDER BY c DESC" --external --file=- --name=passwd --structure='login String, unused String, uid UInt16, gid UInt16, comment String, home String, shell String'
/bin/sh 20
/bin/false 5
/bin/bash 4
/usr/sbin/nologin 1
/bin/sync 1
При использовании HTTP интерфейса, внешние данные передаются в формате multipart/form-data. Каждая таблица передаётся отдельным файлом. Имя таблицы берётся из имени файла. В query_string передаются параметры name_format, name_types, name_structure, где name - имя таблицы, которой соответствуют эти параметры. Смысл параметров такой же, как при использовании клиента командной строки.
Пример:
$ cat /etc/passwd | sed 's/:/\t/g' > passwd.tsv
$ curl -F 'passwd=@passwd.tsv;' 'http://localhost:8123/?query=SELECT+shell,+count()+AS+c+FROM+passwd+GROUP+BY+shell+ORDER+BY+c+DESC&passwd_structure=login+String,+unused+String,+uid+UInt16,+gid+UInt16,+comment+String,+home+String,+shell+String'
/bin/sh 20
/bin/false 5
/bin/bash 4
/usr/sbin/nologin 1
/bin/sync 1
При распределённой обработке запроса, временные таблицы передаются на все удалённые серверы.
Механизм генерации случайных таблиц создает случайные данные для заданной схемы таблицы.
Примеры использования:
ENGINE = GenerateRandom([random_seed [,max_string_length [,max_array_length]]])
Параметры max_array_length
и max_string_length
определяют максимальную длину всех соответственно массивируйте или отображайте столбцы и строки в сгенерированных данных.
Механизм генерации таблиц поддерживает только SELECT
запросы.
Он поддерживает все типы данных, которые могут храниться в таблице, кроме LowCardinality
и AggregateFunction
.
Пример:
1. Настройте generate_engine_table
таблицу:
CREATE TABLE generate_engine_table (name String, value UInt32) ENGINE = GenerateRandom(1, 5, 3)
2. Запрашивайте данные:
SELECT * FROM generate_engine_table LIMIT 3
┌─name─┬──────value─┐
│ c4xJ │ 1412771199 │
│ r │ 1791099446 │
│ 7#$ │ 124312908 │
└──────┴─────────────┘
Подробности реализации:
ALTER
SELECT ... SAMPLE
INSERT
Движки баз данных обеспечивают работу с таблицами.
По умолчанию RT.WideStore использует движок Atomic. Он поддерживает конфигурируемые движки таблиц и диалект SQL.
Также можно использовать следующие движки баз данных:
Позволяет подключаться к базам данных на удалённом MySQL сервере и выполнять запросы INSERT
и SELECT
для обмена данными между ClickHouse и MySQL.
Движок баз данных MySQL
транслирует запросы при передаче на сервер MySQL, что позволяет выполнять и другие виды запросов, например SHOW TABLES
или SHOW CREATE TABLE
.
Не поддерживаемые виды запросов:
RENAME
CREATE TABLE
ALTER
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MySQL('host:port', ['database' | database], 'user', 'password')
Параметры движка
host:port
— адрес сервера MySQL.database
— имя базы данных на удалённом сервере.user
— пользователь MySQL.password
— пароль пользователя.
MySQL |
RT.WideStore |
---|---|
НЕПОДПИСАННЫЙ TINYINT | UInt8 |
TINYINT | Int8 |
БЕЗЗНАКОВЫЙ SMALLINT | UInt16 |
SMALLINT | Int16 |
НЕПОДПИСАННЫЙ INT, НЕПОДПИСАННЫЙ MEDIUMINT | UInt32 |
INT, MEDIUMINT | Int32 |
НЕПОДПИСАННЫЙ BIGINT | UInt64 |
BIGINT | Int64 |
ПЛАВАЮЩИЙ | Float32 |
ДВОЙНОЙ | Float64 |
Дата | Дата |
ДАТА-ВРЕМЯ, ВРЕМЕННАЯ МЕТКА | Дата и время |
ДВОИЧНЫЙ ФАЙЛ | Исправленная строка |
Все прочие типы данных преобразуются в String.
Nullable поддержан.
Для лучшей совместимости к глобальным переменным можно обращаться в формате MySQL, как @@identifier
.
Поддерживаются следующие переменные:
version
max_allowed_packet
ПРЕДУПРЕЖДЕНИЕ:
В настоящее время эти переменные реализованы только как "заглушки" и не содержат актуальных данных.
Пример:
SELECT @@version;
Таблица в MySQL:
mysql> USE test;
Database changedmysql> CREATE TABLE `mysql_table` (
-> `int_id` INT NOT NULL AUTO_INCREMENT,
-> `float` FLOAT NOT NULL,
-> PRIMARY KEY (`int_id`));
Query OK, 0 rows affected (0,09 sec)
mysql> insert into mysql_table (`int_id`, `float`) VALUES (1,2);
Query OK, 1 row affected (0,00 sec)
mysql> select * from mysql_table;
+--------+-------+
| int_id | value |
+--------+-------+
| 1 | 2 |
+--------+-------+
1 row in set (0,00 sec)
База данных в RT.WideStore, позволяющая обмениваться данными с сервером MySQL:
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'test', 'my_user', 'user_password')
SHOW DATABASES
┌─name─────┐
│ default │
│ mysql_db │
│ system │
└───────────┘
SHOW TABLES FROM mysql_db
┌─name──────────┐
│ mysql_table │
└───────────────┘
SELECT * FROM mysql_db.mysql_table
┌─int_id─┬─value─┐
│ 1 │ 2 │
└─────────┴───────┘
INSERT INTO mysql_db.mysql_table VALUES (3,4)
SELECT * FROM mysql_db.mysql_table
┌─int_id─┬─value─┐
│ 1 │ 2 │
│ 3 │ 4 │
└─────────┴───────┘
Сохраняет таблицы только в оперативной памяти expiration_time_in_seconds
через несколько секунд после последнего доступа. Может использоваться только с таблицами *Log.
Он оптимизирован для хранения множества небольших таблиц *Log, для которых обычно существует большой временной интервал между обращениями.
CREATE DATABASE testlazy ENGINE = Lazy(expiration_time_in_seconds);
Поддерживает неблокирующие запросы DROP TABLE и RENAME TABLE и атомарные запросы EXCHANGE TABLES. Движок Atomic
используется по умолчанию.
CREATE DATABASE test [ENGINE = Atomic];
Каждая таблица в базе данных Atomic
имеет уникальный UUID и хранит данные в папке /clickhouse_path/store/xxx/xxxyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy/
, где xxxyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy
- это UUID таблицы. Обычно UUID генерируется автоматически, но пользователь также может явно указать UUID в момент создания таблицы (однако это не рекомендуется). Для отображения UUID в запросе SHOW CREATE
вы можете использовать настройку show_table_uuid_in_table_create_query_if_not_nil. Результат выполнения в таком случае будет иметь вид:
CREATE TABLE name UUID '28f1c61c-2970-457a-bffe-454156ddcfef' (n UInt64) ENGINE = ...;
Запросы RENAME выполняются без изменения UUID и перемещения табличных данных. Эти запросы не ожидают завершения использующих таблицу запросов и выполняются мгновенно.
При выполнении запроса DROP TABLE
никакие данные не удаляются. Таблица помечается как удаленная, метаданные перемещаются в папку /clickhouse_path/metadata_dropped/
и база данных уведомляет фоновый поток. Задержка перед окончательным удалением данных задается настройкой database_atomic_delay_before_drop_table_sec. Вы можете задать синхронный режим, определяя модификатор SYNC
. Используйте для этого настройку database_atomic_wait_for_drop_and_detach_synchronously. В этом случае запрос DROP
ждет завершения SELECT
, INSERT
и других запросов, которые используют таблицу. Таблица будет фактически удалена, когда она не будет использоваться.
Запрос EXCHANGE атомарно меняет местами две таблицы или два словаря. Например, вместо неатомарной операции:
RENAME TABLE new_table TO tmp, old_table TO new_table, tmp TO old_table;
вы можете использовать один атомарный запрос:
EXCHANGE TABLES new_table AND old_table;
Для таблиц ReplicatedMergeTree рекомендуется не указывать параметры движка - путь в ZooKeeper и имя реплики. В этом случае будут использоваться параметры конфигурации: default_replica_path и default_replica_name. Если вы хотите определить параметры движка явно, рекомендуется использовать макрос {uuid}
. Это удобно, так как автоматически генерируются уникальные пути для каждой таблицы в ZooKeeper.
Смотрите также:
Движок баз данных позволяет подключаться к базе SQLite и выполнять запросы INSERT
и SELECT
для обмена данными между ClickHouse и SQLite.
CREATE DATABASE sqlite_database
ENGINE = SQLite('db_path')
Параметры движка:
db_path
— путь к файлу с базой данных SQLite.
SQLite |
RT.WideStore |
---|---|
INTEGER | Int32 |
REAL | Float32 |
TEXT | String |
BLOB | String |
SQLite хранит всю базу данных (определения, таблицы, индексы и сами данные) в виде единого кроссплатформенного файла на хост-машине. Во время записи SQLite блокирует весь файл базы данных, поэтому операции записи выполняются последовательно. Операции чтения могут быть многозадачными. SQLite не требует управления службами (например, сценариями запуска) или контроля доступа на основе GRANT
и паролей. Контроль доступа осуществляется с помощью разрешений файловой системы, предоставляемых самому файлу базы данных.
Отобразим список таблиц базы данных в ClickHouse, подключенной к SQLite:
CREATE DATABASE sqlite_db ENGINE = SQLite('sqlite.db');
SHOW TABLES FROM sqlite_db;
┌──name───┐
│ table1 │
│ table2 │
└─────────┘
Отобразим содержимое таблицы:
SELECT * FROM sqlite_db.table1;
┌─col1──┬─col2─┐
│ line1 │ 1 │
│ line2 │ 2 │
│ line3 │ 3 │
└────────┴──────┘
Вставим данные в таблицу SQLite из таблицы RT.WideStore:
CREATE TABLE clickhouse_table(`col1` String,`col2` Int16) ENGINE = MergeTree() ORDER BY col2;
INSERT INTO clickhouse_table VALUES ('text',10);
INSERT INTO sqlite_db.table1 SELECT * FROM clickhouse_table;
SELECT * FROM sqlite_db.table1;
┌─col1──┬─col2─┐
│ line1 │ 1 │
│ line2 │ 2 │
│ line3 │ 3 │
│ text │ 10 │
└────────┴──────┘
Позволяет подключаться к БД на удаленном сервере PostgreSQL. Поддерживает операции чтения и записи (запросы SELECT
и INSERT
) для обмена данными между ClickHouse и PostgreSQL.
Позволяет в реальном времени получать от удаленного сервера PostgreSQL информацию о таблицах БД и их структуре с помощью запросов SHOW TABLES
и DESCRIBE TABLE
.
Поддерживает операции изменения структуры таблиц (ALTER TABLE ... ADD|DROP COLUMN
). Если параметр use_table_cache
(см. ниже раздел Параметры движка) установлен в значение 1
, структура таблицы кешируется, и изменения в структуре не отслеживаются, но будут обновлены, если выполнить команды DETACH
и ATTACH
.
CREATE DATABASE test_database
ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `schema`, `use_table_cache`]);
Параметры движка:
host:port
— адрес сервера PostgreSQL.database
— имя удаленной БД.user
— пользователь PostgreSQL.password
— пароль пользователя.schema
— схема PostgreSQL.use_table_cache
— определяет кеширование структуры таблиц БД. Необязательный параметр. Значение по умолчанию: 0
.
PostgreSQL |
RT.WideStore |
---|---|
DATE | Date |
TIMESTAMP | DateTime |
REAL | Float32 |
DOUBLE | Float64 |
DECIMAL, NUMERIC | Decimal |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
SERIAL | UInt32 |
BIGSERIAL | UInt64 |
TEXT, CHAR | String |
INTEGER | Nullable(Int32) |
ARRAY | Array |
Обмен данными между БД RT.WideStore и сервером PostgreSQL:
CREATE DATABASE test_database
ENGINE = PostgreSQL('postgres1:5432', 'test_database', 'postgres', 'mysecretpassword', 1);
SHOW DATABASES;
┌─name───────────┐
│ default │
│ test_database │
│ system │
└────────────────┘
SHOW TABLES FROM test_database;
┌─name───────┐
│ test_table │
└─────────────┘
Чтение данных из таблицы PostgreSQL:
SELECT * FROM test_database.test_table;
┌─id─┬─value─┐
│ 1 │ 2 │
└────┴────────┘
Запись данных в таблицу PostgreSQL:
INSERT INTO test_database.test_table VALUES (3,4);
SELECT * FROM test_database.test_table;
┌─int_id─┬─value─┐
│ 1 │ 2 │
│ 3 │ 4 │
└─────────┴───────┘
Пусть структура таблицы была изменена в PostgreSQL:
postgre> ALTER TABLE test_table ADD COLUMN data Text
Поскольку при создании БД параметр use_table_cache
был установлен в значение 1
, структура таблицы в ClickHouse была кеширована и поэтому не изменилась:
DESCRIBE TABLE test_database.test_table;
┌─name───┬─type───────────────┐
│ id │ Nullable(Integer) │
│ value │ Nullable(Integer) │
└─────────┴────────────────────┘
После того как таблицу «отцепили» и затем снова «прицепили», структура обновилась:
DETACH TABLE test_database.test_table;
ATTACH TABLE test_database.test_table;
DESCRIBE TABLE test_database.test_table;
┌─name───┬─type───────────────┐
│ id │ Nullable(Integer) │
│ value │ Nullable(Integer) │
│ data │ Nullable(String) │
└─────────┴────────────────────┘
Движок основан на движке Atomic. Он поддерживает репликацию метаданных через журнал DDL, записываемый в ZooKeeper и выполняемый на всех репликах для данной базы данных.
На одном сервере ClickHouse может одновременно работать и обновляться несколько реплицированных баз данных. Но не может существовать нескольких реплик одной и той же реплицированной базы данных.
CREATE DATABASE testdb ENGINE = Replicated('zoo_path', 'shard_name', 'replica_name') [SETTINGS ...]
Параметры движка:
zoo_path
— путь в ZooKeeper. Один и тот же путь ZooKeeper соответствует одной и той же базе данных.
shard_name
— Имя шарда. Реплики базы данных группируются в шарды по имени.
replica_name
— Имя реплики. Имена реплик должны быть разными для всех реплик одного и того же шарда.
"ПРЕДУПРЕЖДЕНИЕ"
Для таблиц ReplicatedMergeTree если аргументы не заданы, то используются аргументы по умолчанию:
/clickhouse/tables/{uuid}/{shard}
и{replica}
. Они могут быть изменены в серверных настройках: default_replica_path и default_replica_name. Макрос{uuid}
раскрывается вUUID
таблицы,{shard}
и{replica}
— в значения из конфига сервера. В будущем появится возможность использовать значенияshard_name
иreplica_name
аргументов движка базы данныхReplicated
.
DDL-запросы с базой данных Replicated
работают похожим образом на ON CLUSTER запросы, но с небольшими отличиями.
Сначала DDL-запрос пытается выполниться на инициаторе (том хосте, который изначально получил запрос от пользователя). Если запрос не выполнился, то пользователь сразу получает ошибку, другие хосты не пытаются его выполнить. Если запрос успешно выполнился на инициаторе, то все остальные хосты будут автоматически делать попытки выполнить его. Инициатор попытается дождаться выполнения запроса на других хостах (не дольше distributed_ddl_task_timeout) и вернёт таблицу со статусами выполнения запроса на каждом хосте.
Поведение в случае ошибок регулируется настройкой distributed_ddl_output_mode, для Replicated
лучше выставлять её в null_status_on_timeout
— т.е. если какие-то хосты не успели выполнить запрос за distributed_ddl_task_timeout, то вместо исключения для них будет показан статус NULL
в таблице.
В системной таблице system.clusters есть кластер с именем, как у реплицируемой базы, который состоит из всех реплик базы. Этот кластер обновляется автоматически при создании/удалении реплик, и его можно использовать для Distributed таблиц.
При создании новой реплики базы, эта реплика сама создаёт таблицы. Если реплика долго была недоступна и отстала от лога репликации — она сверяет свои локальные метаданные с актуальными метаданными в ZooKeeper, перекладывает лишние таблицы с данными в отдельную нереплицируемую базу (чтобы случайно не удалить что-нибудь лишнее), создаёт недостающие таблицы, обновляет имена таблиц, если были переименования. Данные реплицируются на уровне ReplicatedMergeTree
, т.е. если таблица не реплицируемая, то данные реплицироваться не будут (база отвечает только за метаданные).
Запросы ALTER TABLE ATTACH|FETCH|DROP|DROP DETACHED|DETACH PARTITION|PART
допустимы, но не реплицируются. Движок базы данных может только добавить/извлечь/удалить партицию или кусок нынешней реплики. Однако если сама таблица использует движок реплицируемой таблицы, тогда данные будут реплицированы после применения ATTACH
.
Создадим реплицируемую базу на трех хостах:
node1 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','replica1');
node2 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','other_replica');
node3 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','{replica}');
Выполним DDL-запрос на одном из хостов:
CREATE TABLE r.rmt (n UInt64) ENGINE=ReplicatedMergeTree ORDER BY n;
Запрос выполнится на всех остальных хостах:
┌─────hosts─────────────┬──status─┬──error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ shard1|replica1 │ 0 │ │ 2 │ 0 │
│ shard1|other_replica │ 0 │ │ 1 │ 0 │
│ other_shard|r1 │ 0 │ │ 0 │ 0 │
└────────────────────────┴──────────┴───────┴───────────────────────┴────────────────────┘
Кластер в системной таблице system.clusters
:
SELECT cluster, shard_num, replica_num, host_name, host_address, port, is_local
FROM system.clusters WHERE cluster='r';
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└──────────┴────────────┴──────────────┴────────────┴───────────────┴───────┴──────────┘
Создадим распределенную таблицу и вставим в нее данные:
node2 :) CREATE TABLE r.d (n UInt64) ENGINE=Distributed('r','r','rmt', n % 2);
node3 :) INSERT INTO r SELECT * FROM numbers(10);
node1 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node1 │ [1,3,5,7,9] │
│ node2 │ [0,2,4,6,8] │
└────────┴────────────────┘
Добавление реплики:
node4 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','r2');
Новая реплика автоматически создаст все таблицы, которые есть в базе, а старые реплики перезагрузят из ZooKeeper-а конфигурацию кластера:
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address──┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 1 │ 2 │ node4 │ 127.0.0.1 │ 9003 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└──────────┴────────────┴──────────────┴────────────┴───────────────┴───────┴───────────┘
Распределенная таблица также получит данные от нового хоста:
node2 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node2 │ [1,3,5,7,9] │
│ node4 │ [0,2,4,6,8] │
└────────┴────────────────┘
Это экспериментальный движок, который не следует использовать в продакшене.
Создает базу данных RT.WideStore со всеми таблицами, существующими в MySQL, и всеми данными в этих таблицах.
Сервер RT.WideStore работает как реплика MySQL. Он читает файл binlog и выполняет DDL and DML-запросы.
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializedMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]
Параметры движка
host:port
— адрес сервера MySQL.database
— имя базы данных на удалённом сервере.user
— пользователь MySQL.password
— пароль пользователя.Настройки движка
max_rows_in_buffer
— максимальное количество строк, содержимое которых может кешироваться в памяти (для одной таблицы и данных кеша, которые невозможно запросить). При превышении количества строк, данные будут материализованы. Значение по умолчанию: 65 505
.max_bytes_in_buffer
— максимальное количество байтов, которое разрешено кешировать в памяти (для одной таблицы и данных кеша, которые невозможно запросить). При превышении количества строк, данные будут материализованы. Значение по умолчанию: 1 048 576
.max_rows_in_buffers
— максимальное количество строк, содержимое которых может кешироваться в памяти (для базы данных и данных кеша, которые невозможно запросить). При превышении количества строк, данные будут материализованы. Значение по умолчанию: 65 505
.max_bytes_in_buffers
— максимальное количество байтов, которое разрешено кешировать данным в памяти (для базы данных и данных кеша, которые невозможно запросить). При превышении количества строк, данные будут материализованы. Значение по умолчанию: 1 048 576
.max_flush_data_time
— максимальное время в миллисекундах, в течение которого разрешено кешировать данные в памяти (для базы данных и данных кеша, которые невозможно запросить). При превышении количества указанного периода, данные будут материализованы. Значение по умолчанию: 1000
.max_wait_time_when_mysql_unavailable
— интервал между повторными попытками, если MySQL недоступен. Указывается в миллисекундах. Отрицательное значение отключает повторные попытки. Значение по умолчанию: 1000
.allows_query_when_mysql_lost
— признак, разрешен ли запрос к материализованной таблице при потере соединения с MySQL. Значение по умолчанию: 0
(false
).CREATE DATABASE mysql ENGINE = MaterializedMySQL('localhost:3306', 'db', 'user', '***')
SETTINGS
allows_query_when_mysql_lost=true,
max_wait_time_when_mysql_unavailable=10000;
Настройки на стороне MySQL-сервера
Для правильной работы MaterializedMySQL
следует обязательно указать на сервере MySQL следующие параметры конфигурации:
default_authentication_plugin = mysql_native_password
— MaterializedMySQL
может авторизоваться только с помощью этого метода.
gtid_mode = on
— ведение журнала на основе GTID является обязательным для обеспечения правильной репликации.
:::note "Внимание" При включении gtid_mode
вы также должны указать enforce_gtid_consistency = on
. :::
При работе с движком баз данных MaterializedMySQL
используются таблицы семейства ReplacingMergeTree с виртуальными столбцами _sign
и _version
.
_version
— счетчик транзакций. Тип UInt64._sign
— метка удаления. Тип Int8. Возможные значения:1
— строка не удалена,-1
— строка удалена.
MySQL |
RT.WideStore |
---|---|
TINY | Int8 |
SHORT | Int16 |
INT24 | Int32 |
LONG | UInt32 |
LONGLONG | UInt64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL, NEWDECIMAL | Decimal |
DATE, NEWDATE | Date |
DATETIME, TIMESTAMP | DateTime |
DATETIME2, TIMESTAMP2 | DateTime64 |
ENUM | Enum |
STRING | String |
VARCHAR, VAR_STRING | String |
BLOB | String |
BINARY | FixedString |
Тип Nullable поддерживается.
Другие типы не поддерживаются. Если таблица MySQL содержит столбец другого типа, RT.WideStore выдаст исключение "Неподдерживаемый тип данных" ("Unhandled data type") и остановит репликацию.
Кроме ограничений на типы данных, существует несколько ограничений по сравнению с базами данных MySQL, которые следует решить до того, как станет возможной репликация:
PRIMARY KEY
.ENUM
вне диапазона значений (определяется размерностью ENUM
), не будет работать.DDL-запросы в MySQL конвертируются в соответствующие DDL-запросы в RT.WideStore (ALTER, CREATE, DROP, RENAME). Если RT.WideStore не может конвертировать какой-либо DDL-запрос, он его игнорирует.
Данные являются неизменяемыми со стороны пользователя RT.WideStore, но автоматически обновляются путём репликации следующих запросов из MySQL:
Запрос INSERT
конвертируется в RT.WideStore в INSERT
с _sign=1
.
Запрос DELETE
конвертируется в RT.WideStore в INSERT
с _sign=-1
.
Запрос UPDATE
конвертируется в RT.WideStore в INSERT
с _sign=-1
и INSERT
с _sign=1
.
Запрос SELECT
из таблиц движка MaterializedMySQL
имеет некоторую специфику:
Если в запросе SELECT
напрямую не указан столбец _version
, то используется модификатор FINAL. Таким образом, выбираются только строки с MAX(_version)
.
Если в запросе SELECT
напрямую не указан столбец _sign
, то по умолчанию используется WHERE _sign=1
. Таким образом, удаленные строки не включаются в результирующий набор.
Результат включает комментарии к столбцам, если они существуют в таблицах базы данных MySQL.
Секции PRIMARY KEY
и INDEX
в MySQL конвертируются в кортежи ORDER BY
в таблицах RT.WideStore.
В таблицах RT.WideStore данные физически хранятся в том порядке, который определяется секцией ORDER BY
. Чтобы физически перегруппировать данные, используйте материализованные представления.
Примечание:
_sign=-1
физически не удаляются из таблиц.UPDATE/DELETE
не поддерживаются движком MaterializedMySQL
.MaterializedMySQL
запрещены.MaterializedMySQL
влияет настройка optimize_on_insert. Когда таблица на MySQL сервере меняется, происходит слияние данных в соответсвующей таблице в базе данных MaterializedMySQL
.Запросы в MySQL:
mysql> CREATE DATABASE db;
mysql> CREATE TABLE db.test (a INT PRIMARY KEY, b INT);
mysql> INSERT INTO db.test VALUES (1, 11), (2, 22);
mysql> DELETE FROM db.test WHERE a=1;
mysql> ALTER TABLE db.test ADD COLUMN c VARCHAR(16);
mysql> UPDATE db.test SET c='Wow!', b=222;
mysql> SELECT * FROM test;
+---+------+------+
| a | b | c |
+---+------+------+
| 2 | 222 | Wow! |
+---+------+------+
База данных в RT.WideStore, обмен данными с сервером MySQL:
База данных и созданная таблица:
CREATE DATABASE mysql ENGINE = MaterializedMySQL('localhost:3306', 'db', 'user', '***');
SHOW TABLES FROM mysql;
┌─name─┐
│ test │
└──────┘
После вставки данных:
SELECT * FROM mysql.test;
┌─a─┬──b─┐
│ 1 │ 11 │
│ 2 │ 22 │
└───┴────┘
После удаления данных, добавления столбца и обновления:
SELECT * FROM mysql.test;
┌─a─┬───b─┬─c─────┐
│ 2 │ 222 │ Wow! │
└───┴──────┴──────┘