Tarantool Data Grid + Java = …
В последнее время всё большую популярность набирает Tarantool — платформа in-memory вычислений с гибкой схемой данных, включающая в себя NoSQL-базу данных и сервер приложений. В этой статье я хочу рассказать об одной из его реализаций — Tarantool Data Grid (TDG).
Что такое Tarantool Data Grid?
Tarantool Data Grid (TDG) — это, по сути, три компонента в одной коробке:
- Сам Tarantool
- Cartridge
- UI
Прежде чем работать c TDG, необходимо его настроить и сконфигурировать. Настройка осуществляется в UI и включает в себя настройку кластера Tarantool (собственно, это именно то, что делает Cartridge). Затем нужно загрузить в TDG набор файлов конфигурации, включающий в себя:
- Модель (файл model.avsc) – обязательно;
- Собственно файл конфигурации TDG (файл config.yml) – обязательно;
- Пользовательские lua-скрипты – при необходимости.
Основой для работы с TDG является модель — местный эквивалент схемы данных реляционных БД. Модель TDG представляет собой avro-схему примерно следующего вида:
[
{
"name": "User",
"type": "record",
"fields": [
{
"name": "user_id",
"type": "long"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": [
"null",
"long"
]
},
{
"name": "sex",
"type": [
"null",
"boolean"
]
}
],
"indexes": [
"user_guid"
]
},
{
"name": "Address",
"type": "record",
"fields": [
{
"name": "address_guid",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "street",
"type": "string"
},
{
"name": "house",
"type": "string"
},
{
"name": "apartment",
"type": [
"null",
"long"
]
}
],
"indexes": [
"address_guid"
]
}
]
Как видно, модель представляет собой avro-описание спейсов Tarantool, в которых будут храниться данные соответствующих типов. Спейсы будут созданы автоматически при загрузке модели в TDG. В нашем случае это спейсы User и Address.
У каждого спейса заданы индексы (в данном примере это guid). Но по желанию или необходимости индексы могут быть и составными. Для этого можно в модели написать конструкцию вида:
"indexes": [
{
"name": "primary",
"parts": [
“field_1”,
"field_2",
…
]
}
]
Но модель — это далеко не всё. Обязательным элементом конфигурации TDG является корректно заполненный файл config.yml, который должен быть «отдан» TDG вместе с моделью. Заполнение config.yml — отдельная тема, всестороннее рассмотрение которой заслуживает отдельной статьи. Здесь мы ограничимся лишь отдельным примером, о котором я расскажу немного позже.
Для взаимодействия с TDG из приложений, разработанных на различных языках, существуют т.н. коннекторы. Есть коннектор и для Java + Spring, представляющий собой реализацию спецификации JPA — spring-data-tarantool. Для его подключения в файле pom.xml напишем его зависимость:
<dependency>
<groupId>io.tarantool</groupId>
<artifactId>spring-data-tarantool</artifactId>
<version>0.5.2</version>
</dependency>
Пользоваться библиотекой довольно просто (особенно, если вы уже имели дело со Spring Data JPA).
Как обычно, описываем классы сущностей (сейчас ограничимся сущностью User из нашей модели):
@Tuple(“user”)
public class User {
@Id
@Field("user_id")
String id;
String name;
Integer age;
@Field("sex")
Boolean gender;
}
Как видите, аннотация @Field повешена не на все поля: в соответствии с принципами JPA маппинг этих полей будет производиться на основе полей класса сущности — а они совпадают с соответствующими полями модели. Там же, где имена полей не совпадают, ставим аннотацию.
Пишем репозиторий:
public interface UserRepository extends CrudRepository<User, Long> {
}
И всё, теперь мы можем обращаться к Tarantool так, как обычно и делаем, используя Spring Data JPA и стандартные методы jpa-репозитория. Например:
@Service
public class MyService {
private final UserRepository repository;
@Autowired
public MyService(UserRepository repository) {
this.repository = repository;
}
public void myMethod() {
User user = new User();
user.setName("Alexey Petrov");
user.setAge(28);
user.setGender(true)
User saved = repository.save(user);
List<User> userList = repository.findAll();
}
}
Поставка TDG включает в себя коробочный модуль repository, в котором имеются реализации стандартных CRUD-методов. Методы JPA обращаются именно к ним. Мы также имеем возможность обратиться к методам модуля repository непосредственно. Для этого будем использовать аннотацию @Query. Например, для записи в спейс User мы можем использовать следующий метод:
@Query(function = "repository.put")
List<User> put(String typeName, User entity, Map<String, ?> options, List<?> context, Map<?, ?> credentials);
И тогда для записи очередного пользователя мы вызовем метод примерно так:
User myUser = …. // вызов метода создания юзера
List<User> saved = repository.put(“user”, myUser, new HashMap<>(), new HashMap<>())
Но что, если нам этого мало, и требуется какая-то дополнительная кастомная логика? И при этом желательно чтобы всё происходило в рамках одного обращения к TDG, а не нескольких? Здесь нам помогут функции, написанные на языке lua (именно этот язык выступает в качестве основного языка запросов Tarantool) и хранимые в TDG. Как уже говорилось выше, такие lua-скрипты загружаются в TDG вместе с конфигурационными файлами.
Например, мы написали некую функцию (на всякий случай уточню: в lua это называется именно «функции», а не «методы») do_something, и поместили её в файл-модуль my_module.lua:
local log = require('log')
local function do_something(m)
log.info(‘Function do_something is called! M = ’ + m)
end
return {
do_something = do_something
}
Теперь мы хотим вызывать её в нашем Java-сервисе. Как это сделать?
Для этого нужно произвести дополнительное конфигурирование TDG. Чтобы дать возможность вызывать нашу функцию извне, в файле config.yml напишем следующее:
services:
do_something:
doc: "test function"
function: my_module.do_something
return_type: any
args:
m: string
Тем самым мы указываем, что снаружи можно вызвать из TDG сервис do_something. При его вызове будет вызвана функция do_something из файла my_module.lua, и эта функция принимает на вход строку, а возвращает что угодно (в нашем случае — ничего).
За вызов сервисов в TDG отвечает встроенный метод call_service. Воспользуемся им. В Java-сервисе в репозиторий добавим вот такой метод:
@Query(function = "call_service")
List<T> callService(String name, Map<String, ?> args, Map<?, ?> options);
И теперь в нужном месте мы можем вызвать наш сервис следующим образом:
public void callTDGService(String str) {
Map<String, String> args = new HashMap<>();
args.put(“m”, “hello, TDG!”)
service.callService("do_something", args, new HashMap<>());
}
Но и это еще не всё. TDG позволяет осуществить последовательный «конвейерный» вызов, и, таким образом, одномоментно выполнять такие действия, как приём, обработка, сохранение и репликация входных данных. Для этого в TDG существует функция tarantool_protocol_process. Вызывается она точно так же, как и любая другая функция TDG. Пишем в репозитории метод:
@Query(function = "tarantool_protocol_process")
String process(RawUser user, Map<?, ?> options);
Предположим, что мы хотим отправлять в TDG инстанс некоего класса RawUser, там в соответствии с какой-то логикой преобразовывать его в уже знакомый нам класс User, сохранять его в соответствующий спейс, после чего реплицировать эти данные в имеющееся у нас резервное хранилище (например, БД PostgreSQL). Здесь-то нам и поможет функция tarantool_protocol_process. Она принимает наш объект и помещает его в конвейерную очередь, где он будет обрабатываться согласно тому, как мы эту очередь сконфигурируем. А значит, приступим к конфигурации. Снова открываем наш файл config.yml и напишем там следующее:
connector:
input:
- name: tarantool_protocol
type: tarantool_protocol
routing_key: user
output:
- name: user
type: dummy
input_processor:
handlers:
- key: user
function: my_module.create_user_from_raw_user
storage:
- key: user
type: User
output_processor:
user:
handlers:
- function: pg_replicator.save_user
outputs:
- user
odbc:
- dsn: DRIVER=/usr/pgsql-13/lib/psqlodbc.so;SERVER=<url>;PORT=<port>;DATABASE=<db-name>;UID=<user>;PWD=<password>;
name: postgres
Сначала мы настроили коннекторы для получения (input) и для возвращения (output) данных. Здесь есть очень важный момент – routing_key. Именно по нему очередь будет отличать ваши данные от всех остальных. Этот ключ можно задать самим первым действием, когда объект попадет в очередь. Но мы ограничимся тем, который будет ему назначен по умолчанию (в нашем случае это user).
Далее конфигурируем input_processor, т.е. действия, которые будут производиться с объектом на этапе получения. В секции handlers описано, что при получении данных с ключом user будет вызвана функция create_user_from_raw_user в модуле my_module.lua. Представим, что мы написали такую функцию, и она возвращает объект типа User (в соответствии с загруженной в TGD моделью).
Далее попадаем в секцию storage. В ней говорится, что объект с ключом user будет сохранен в спейc User. При этом, если такой записи в спейсе нет, она будет добавлена. Если она уже есть, то она будет обновлена.
Далее следует конфигурирование output_processor – действий, которые будут совершены с объектом перед удалением из очереди. В нашем случает это репликация во внешнюю систему – БД PostgreSQL. Мы описываем, что при попадании в output_processor данный с ключом user будет вызвана функция save_user в модуле pg_replicator.lua. А чтобы она отработала корректно, мы должны сконфигурировать подключение к PostrgeSQL в секции odbc.
Листинг файла pg_replicator.lua
local odbc = require(‘odbc’)
local function save_user(user)
local res, err = odbc.execute(‘postgres’, “insert into my_scheme.user(name, age, sex) values (?, ?, ?)”,
{user.name, user.age, user.sex})
return res
end
Вот так всё просто.