При разработке расширений PostgreSQL иногда требуется запустить отдельный процесс, который выполняет какие-то действия в фоне, без участия пользователя. Такой процесс называется background worker . Давайте разберемся, как все это устроено.
Background worker’ы запускаются процессом postmaster, так же, как и в случае с уже знакомыми нам checkpointer, walwriter и другими процессами СУБД. Для запуска background worker’ов предусмотрен специальный API для расширений, который мы рассмотрим далее. Стоит отметить, что хотя checkpointer и другие подобные процессы качественно похожи на background worker’ы, это другой тип процессов. Они не запускаются через API для запуска background worker’ов.
Есть два способа запустить background worker’а:
- Через вызов
RegisterBackgroundWorker()
. Расширение должно быть подгружено в postmaster при помощиshared_preload_libraries
. Вызов функции должен производиться из_PG_init()
; - При помощи вызова
RegisterDynamicBackgroundWorker()
из обычного бэкенда;
Если вы не понимаете, о каких таких shared_preload_libraries
и _PG_init()
идет речь, ознакомьтесь с постом Расширения PostgreSQL: разделяемая память и локи .
В исходном коде PostgreSQL есть расширение worker_spi , демонстрирующее оба метода запуска. Посмотрим на него в действии:
USE_PGXS = 1 make
USE_PGXS = 1 make install
В postgresql.conf дописываем:
worker_spi.database = eax
worker_spi.total_workers = 1
Перезапускаем СУБД:
Должны увидеть таблицу counted
в схеме schema1
:
type | value
——+——-
Если теперь записать туда таких данных:
INSERT 0 2
eax=# SELECT * FROM schema1.counted;
type | value
——-+——-
total | 0
delta | 1
(2 rows)
… то через какое-то время содержимое таблицы само собой изменится:
type | value
——-+——-
total | 1
(1 row)
Это созданный расширением background worker увидел изменения и в фоне обновил содержимое таблицы. Заметьте, что для текущей базы данных мы даже не делали CREATE EXTENSION
. Как же это работает?
Поскольку расширение прописано в shared_preload_libraries
, при запуске СУБД оно загружается postmaster’ом и управление передается в _PG_init()
.
В упрощенном виде реализация функции выглядит так:
if ( ! process_shared_preload_libraries_in_progress )
return ;
memset ( & worker , 0 , sizeof ( worker ) ) ;
/*
* Воркер имеет доступ к разделяемой памяти и может
* ходить в таблицы
*/
worker. bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION ;
/*
* Воркер запускается, когда система переходит
* в нормальное рабочее состояние
*/
worker. bgw_start_time = BgWorkerStart_RecoveryFinished ;
/* В случае ошибки воркер не перезапускается */
worker. bgw_restart_time = BGW_NEVER_RESTART ;
/* Название динамической библиотеки с кодом воркера */
sprintf ( worker. bgw_library_name , «worker_spi» ) ;
/* Имя функции, которой следует передать управление */
sprintf ( worker. bgw_function_name , «worker_spi_main» ) ;
/*
* Здесь можно указать id процесса, которому postmaster
* будет посылать SIGUSR1, когда воркер запускается или
* завершается
*/
worker. bgw_notify_pid = 0 ;
/* Имя этого конкретного воркера */
snprintf ( worker. bgw_name , BGW_MAXLEN , «worker_spi worker %d» , 1 ) ;
/* Имя группы/типа воркеров */
snprintf ( worker. bgw_type , BGW_MAXLEN , «worker_spi» ) ;
/*
* Аргумент, который будет передан воркеру. Дополнительные
* 128 байт (BGW_EXTRALEN) можно положить в bgw_extra.
*/
worker. bgw_main_arg = Int32GetDatum ( 1 ) ;
/* Просим postmaster запустить воркера */
RegisterBackgroundWorker ( & worker ) ;
Здесь все достаточно просто. Значение bgw_type
используется в pg_stat_activity:
… а bgw_name
— непосредственно в имени процесса:
…
56108 ? Ss 0:00 postgres: worker_spi worker 1
…
При запуске воркера управление передается в worker_spi_main()
. Упрощенно код функции выглядит таким образом:
worker_spi_main ( Datum main_arg )
{
/*
* Это аргумент, переданный через bgw_main_arg.
* К значению bgw_extra можно получить доступ
* через глобальную переменную MyBgworkerEntry.
*/
int index = DatumGetInt32 ( main_arg ) ;
/* Первым делом нужно указать обработчики сигналов… */
pqsignal ( SIGHUP , SignalHandlerForConfigReload ) ;
pqsignal ( SIGTERM , die ) ;
/* … и разблокировать обработку сигналов */
BackgroundWorkerUnblockSignals ( ) ;
/*
* Подключаемся к БД, с которой хотим работать.
* Также есть BackgroundWorkerInitializeConnectionByOid()
*/
BackgroundWorkerInitializeConnection ( /* (тут имя БД) */ ,
NULL , 0 ) ;
/* Основной цикл */
for ( ;; )
{
( void ) WaitLatch ( MyLatch ,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
/* (здесь время в миллисекундах) */ ,
PG_WAIT_EXTENSION ) ;
ResetLatch ( MyLatch ) ;
/*
* Обработать прерывания, если они были получены,
* пока процесс спал. Прерывания в PostgreSQL —
* это не то же самое, что сигналы, хотя некоторые
* прерывания реализованы на основе сигналов.
* Подробности смотри в src/include/miscadmin.h
*/
CHECK_FOR_INTERRUPTS ( ) ;
/*
* Если придет SIGHUP, его обработчик проставит переменной
* ConfigReloadPending значение true и выставит защелку
* MyLatch, что приведет к пробуждению процесса
*/
if ( ConfigReloadPending )
{
ConfigReloadPending = false ;
/* Перечитать конфиг */
ProcessConfigFile ( PGC_SIGHUP ) ;
}
/* (тут вся полезная нагрузка воркера) */
}
}
С защелками и почему спать на них выгоднее, чем использовать pg_usleep()
, ранее мы разобрались в рамках поста Внутренности PostgreSQL: разделяемые буферы .
Итак, мы поняли, как запускается воркер, а также где у него находится основной цикл. Осталось только понять, как воркер ходит в таблицы. А в таблицы он ходит по обычному SPI . Только здесь есть одна особенность, специфичная именно для background worker’ов.
Чтобы сходить в базу данных по SPI, код должен выглядеть так:
* Имя функции говорит само за себя. Вызов запоминает
* время начала исполнения следующего SQL-выражения.
* Вызов может быть не лишено смысла делать перед
* исполнением каждого нового выражения
*/
SetCurrentStatementStartTimestamp ( ) ;
/*
* Создаем новую транзакцию. Перед этим вызовом мы
* обязаны сделать SetCurrentStatementStartTimestamp().
* Сохраненное время используется как время начала
* исполнения транзакции
*/
StartTransactionCommand ( ) ;
/*
* Этот вызов создает активный снэпшот. Последний необходим,
* чтобы последующие запросы видели MVCC данные, с которыми
* они могли бы работать
*/
PushActiveSnapshot ( GetTransactionSnapshot ( ) ) ;
/*
* Теперь, имея транзакцию, устанавливаем самое обычное
* соединение с менеджером SPI. Порядок вызова SPI_connect()
* и PushActiveSnapshot() несущественен.
*/
SPI_connect ( ) ;
/* (здесь обычный код хождения по SPI) */
/*
* Закрываем соединение с SPI менеджером, как обычно.
* Делать это перед elog(ERROR), кстати, не обязательно
*/
SPI_finish ( ) ;
/*
* Снимаем активный снэпшот со стека снэпшотов, освобождаем
* сопутствующие ресурсы
*/
PopActiveSnapshot ( ) ;
/* Коммитим транзакцию */
CommitTransactionCommand ( ) ;
Когда SPI используется в обычных хранимках на языке C, то ничего этого делать не нужно. Ведь к моменту вызова хранимки уже есть как активная транзакция, так и активный снэпшот. Но в background worker’е их приходится создавать вручную.
Как видите, в использовании background worker’ов нет ничего сверх сложного. Пример создания воркера через вызов RegisterDynamicBackgroundWorker()
вы найдете в том же worker_spi.с . Этот код не сложнее рассмотренного выше. Если же вам хочется примеров посложнее, попробуйте поизучать исходники расширения TimescaleDB . В нем background worker’ы используются довольно активно.
Дополнение: Расширения PostgreSQL: конфигурационные параметры