Наша сеть построена на opensips 1.8. Основная нагрузка на opensips приходится при обработке регистраций, в отличие от звонков, на которые приходится гораздо меньшая нагрузка. Поэтому, когда в нашей сети количество регистраций перешагнуло определенный порог, было принято решение о горизонтальном масштабировании opensips. Так родился проект opensips-кластера.
Идея заключалась в создании максимально простого балансировщика, который будет распределять звонки между нодами кластера. Обработку звонков планировалось производить полностью на нодах.
Одновременно с этим, возникла идея о скрытии топологии. В opensips для этого есть два механизма.
Первый — недавно появившаяся функция topology_hiding() в модуле dialog.
Второй — модуль b2b_logic, реализующий полноценный b2b.
После испытаний, было принято окончательное решение использовать b2b_logic. Однако, мы столкнулись с ограничениями в его функциональности. Например, не было возможности полноценно работать с sip-заголовками (удалять, добавлять, изменять), а также не было режима прозрачной аутентификации. Аутентификация была возможна на сервере с b2b, но пробрасывать ее на следующий сервер b2b не умел. Были написаны соответствующие патчи к основному коду, которые разработчики opensips обещают включить в релиз 1.9.
Также следует отметить, что opensips построен так, что на одном запущенном экземпляре нельзя использовать b2b и прокси одновременно. В итоге, родилась схема, которая стала для нас рабочей.
B2B используется для балансировки sip-трафика между нодами кластера, скрывает топологию сети в обе стороны, нормализует и фильтрует некоторые sip-заголовки, отвечает за определение NAT, поддерживает SIP/UDP, SIP/TCP, SIP/TLS протоколы, а также STUN-сервер.
В качестве нод используются opensips в режиме прокси, они отвечают за сервис определения местоположения клиента (location), нормализуют, корректируют sip-заголовки от клиентов, на которых некорректно реализована поддержка sip-протокола, управляют RTP-проксированием и обрабатывают NAT.
В этой схеме SIP-пакет проходит от клиента к class5 софтсвитчу таким образом:
софтсвитч -> балансировщик -> нода -> балансировщик -> клиент
либо в обратном порядке.
Каждый opensips размещен на своем аппаратном сервере, между серверами настроена master-master репликация postgresql.
Конфигурационные файлы у нас генерируются с использованием m4. На каждом сервере лежит файл со статическими переменными, специфичными для данного сервера, а общие части конфигурации хранятся в git. В упрощенном виде, конфигурации привожу здесь.
balancer.cfg
####### Global Parameters ##############################################
debug=3
log_stderror=no
log_facility=LOG_LOCAL0
fork=yes
children=3
disable_tcp=no
# TCP
tcp_children=10
tcp_accept_aliases=yes
tcp_send_timeout=5
tcp_connect_timeout=5
tcp_max_connections=4096
tcp_poll_method=epoll_et
mhomed=1
port=5060
listen = udp:192.168.0.1:5060
listen = udp:8.8.8.8:5060
listen = tcp:192.168.0.1:5060
listen = tcp:8.8.8.8:5060
server_header="Server: Cool SBC"
user_agent_header="User-Agent: Cool SBC"
disable_core_dump=no
####### Modules Section ################################################
mpath="/usr/lib64/opensips/modules"
########################################################################
loadmodule "maxfwd.so"
########################################################################
modparam("maxfwd", "max_limit", 256)
########################################################################
loadmodule "sipmsgops.so"
########################################################################
########################################################################
loadmodule "textops.so"
########################################################################
########################################################################
loadmodule "stun.so"
########################################################################
modparam("stun","primary_ip","8.8.8.8")
modparam("stun","primary_port","5060")
modparam("stun","alternate_ip","192.168.0.1")
modparam("stun","alternate_port","3478")
########################################################################
loadmodule "mi_fifo.so"
########################################################################
modparam("mi_fifo", "fifo_name", "/tmp/opensips_fifo")
modparam("mi_fifo", "fifo_mode", 0666)
modparam("mi_fifo", "fifo_group", "opensips")
modparam("mi_fifo", "fifo_user", "opensips")
modparam("mi_fifo", "reply_dir", "/tmp/")
modparam("mi_fifo", "reply_indent", "t")
########################################################################
loadmodule "db_postgres.so"
########################################################################
########################################################################
loadmodule "avpops.so"
########################################################################
########################################################################
loadmodule "rr.so"
########################################################################
modparam("rr", "append_fromtag", 1)
modparam("rr", "enable_double_rr", 1)
modparam("rr", "add_username", 1)
########################################################################
loadmodule "sl.so"
########################################################################
modparam("sl", "enable_stats", 1)
########################################################################
loadmodule "tm.so"
########################################################################
modparam("tm", "fr_timer", 30)
modparam("tm", "fr_inv_timer", 120)
modparam("tm", "wt_timer", 5)
modparam("tm", "delete_timer", 2)
modparam("tm", "T1_timer", 500)
modparam("tm", "T2_timer", 4000)
modparam("tm", "ruri_matching", 1)
modparam("tm", "via1_matching", 1)
modparam("tm", "unix_tx_timeout", 2)
modparam("tm", "restart_fr_on_each_reply", 1)
modparam("tm", "pass_provisional_replies", 0)
modparam("tm", "syn_branch", 1)
modparam("tm", "onreply_avp_mode", 0)
modparam("tm", "disable_6xx_block", 0)
modparam("tm", "enable_stats", 1)
modparam("tm", "fr_timer_avp", "$avp(fr_timer)")
########################################################################
loadmodule "signaling.so"
########################################################################
########################################################################
loadmodule "path.so"
########################################################################
modparam("path", "use_received", 1)
modparam("path", "enable_double_path", 1)
########################################################################
loadmodule "domain.so"
########################################################################
modparam("domain", "db_url", "postgres://opensips:opensips@192.168.0.2/opensips")
modparam("domain", "db_mode", 1)
modparam("domain", "domain_table", "domain")
modparam("domain", "domain_col", "domain")
########################################################################
loadmodule "cachedb_local.so"
########################################################################
modparam("cachedb_local", "cache_table_size", 9)
modparam("cachedb_local", "cache_clean_period", 86400)
########################################################################
loadmodule "b2b_entities.so"
########################################################################
modparam("b2b_entities", "db_url", "postgres://opensips:opensips@192.168.0.2/opensips_balancer")
modparam("b2b_entities", "db_mode", 1)
modparam("b2b_entities", "server_hsize", 14)
modparam("b2b_entities", "client_hsize", 14)
modparam("b2b_entities", "script_req_route", "B2B_REQUEST")
modparam("b2b_entities", "script_reply_route", "B2B_REPLY")
modparam("b2b_entities", "b2b_key_prefix", "sbc")
########################################################################
loadmodule "b2b_logic.so"
########################################################################
modparam("b2b_logic", "db_url", "postgres://opensips:opensips@192.168.0.2/opensips_balancer")
modparam("b2b_logic", "db_mode", 1)
modparam("b2b_logic", "hash_size", 14)
modparam("b2b_logic", "cleanup_period", 60)
modparam("b2b_logic", "use_init_sdp", 1)
modparam("b2b_logic", "init_callid_hdr", "x-orig-ci")
modparam("b2b_logic", "max_duration", 86400)
modparam("b2b_logic", "b2bl_from_spec_param", "$avp(hdrfrom)")
modparam("b2b_logic", "custom_headers", "Replaces;x-orig-to;x-src-uri")
########################################################################
loadmodule "nathelper.so"
########################################################################
########################################################################
loadmodule "dispatcher.so"
########################################################################
modparam("dispatcher", "db_url", "postgres://opensips:opensips@192.168.0.2/opensips_balancer")
modparam("dispatcher", "flags", 2)
modparam("dispatcher", "force_dst", 0)
modparam("dispatcher", "use_default", 0)
modparam("dispatcher", "dst_avp", "$avp(disp_dst)")
modparam("dispatcher", "attrs_avp", "$avp(disp_attrs)")
modparam("dispatcher", "grp_avp", "$avp(disp_grp)")
modparam("dispatcher", "cnt_avp", "$avp(disp_cnt)")
modparam("dispatcher", "hash_pvar", "$avp(disp_hash)")
modparam("dispatcher", "ds_ping_method", "OPTIONS")
modparam("dispatcher", "ds_ping_from", "sip:balancer@cool.sip")
modparam("dispatcher", "ds_ping_interval", 1)
modparam("dispatcher", "ds_probing_sock", "udp:192.168.0.1:5060")
modparam("dispatcher", "ds_probing_threshhold", 1)
modparam("dispatcher", "ds_probing_mode", 0)
modparam("dispatcher", "options_reply_codes", "200")
modparam("dispatcher", "table_name", "dispatcher")
modparam("dispatcher", "setid_col", "setid")
modparam("dispatcher", "destination_col", "destination")
modparam("dispatcher", "flags_col", "flags")
modparam("dispatcher", "weight_col", "weight")
modparam("dispatcher", "attrs_col", "attrs")
modparam("dispatcher", "socket_col", "socket")
########################################################################
route {
$var(from_proxy) = ds_is_in_list("$si", "$sp");
$var(nolog) = 0;
if ( ($var(from_proxy) == 1) && (is_method("OPTIONS")) ) $var(nolog) = 1;
if ($var(nolog) == 0) xlog("L_INFO", "[MAIN] Incoming request (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n$mb");
force_rport();
remove_hf("Path");
route(VALIDATE);
route(PING);
if (is_method("REGISTER")) {
if (!add_path_received()) { # For proxy to know source ip address
xlog("L_ERR", "[MAIN] Cannot add path (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("503", "Internal Path Error");
exit;
}
route(BALANCE);
$avp(fr_timer) = 3;
route(RELAY);
}
if (is_method("MESSAGE")) {
if ($var(from_proxy) == 1) {
# Topology hiding
remove_hf("Authorization");
remove_hf("Proxy-Authorization");
$du = $ru;
$rd = $fd;
} else {
append_hf("x-src-uri: sip:$si:$sp;transport=$protorn");
route(BALANCE);
}
route(RELAY);
}
if (is_method("ACK")) exit; # Must be an ACK after 401
if (is_method("SUBSCRIBE|PUBLISH")) {
xlog("L_ERR", "[MAIN] Method not supported (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("501", "Method not supported here");
exit;
}
if (!is_method("INVITE|OPTIONS")) {
xlog("L_ERR", "[MAIN] Call leg/Transaction does not exist (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("481", "Call leg/Transaction does not exist");
exit;
}
loose_route(); # Preloaded route (for path module)
if (!is_method("OPTIONS")) append_hf("x-orig-to: $hdr(To)rn");
if ($var(from_proxy) == 1) {
if (is_method("OPTIONS")) { # Ping from proxy to UAC
# Topology hiding
$ru = $du;
remove_hf("Via");
remove_hf("To");
append_hf("To: $durn", "From");
remove_hf("Call-ID");
append_hf("Call-ID: $(ci{s.md5})rn", "To");
route(RELAY);
}
xlog("L_INFO", "[MAIN] Request from proxy (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
} else {
xlog("L_INFO", "[MAIN] Request from UAC or commutator (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
if (route(FROM_CLASS5)) {
if ($fd == "192.168.0.1") { # Override INT_IP to default domain
$avp(hdrfrom) = $hdr(From);
avp_subst("$avp(hdrfrom)", "/<.*>//");
}
} else {
# Fix NATed contact and SDP in client requests
if (nat_uac_test("19")) {
xlog("L_INFO", "[MAIN] NAT detected, fixed contact (oldct=$ct) (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
fix_nated_contact();
}
if ( (has_body("application/sdp")) && (nat_uac_test("8")) ) {
xlog("L_INFO", "[MAIN] NAT detected, fixed SDP (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
fix_nated_sdp("10");
}
}
route(BALANCE);
append_hf("x-src-uri: sip:$si:$sp;transport=$protorn");
}
b2b_set_mode("a");
b2b_init_request("top hiding");
exit;
}
route[B2B_REQUEST] {
xlog("L_INFO", "[B2B_REQUEST] Incoming B2B request (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n$mb");
force_rport();
if (!ds_is_in_list("$si", "$sp")) { # From UAC or commutator
append_hf("x-src-uri: sip:$si:$sp;transport=$protorn");
}
xlog("L_INFO", "[B2B_REQUEST] Request leaving server (will be generated by b2b) (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
}
route[B2B_REPLY] {
xlog("L_INFO", "[B2B_REPLY] Incoming B2B reply (STATUS="$rs $rr" M=$rm IP=$si:$sp F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs)n$mb");
if (!ds_is_in_list("$si", "$sp")) { # From UAC or commutator
append_hf("x-src-uri: sip:$si:$sp;transport=$protorn");
} else {
remove_hf("x-orig-ci");
}
xlog("L_INFO", "[B2B_REPLY] Reply leaving server (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
}
route[BALANCE] {
$du = null; # Ignore preloaded routes
if (avp_check("$si","re/192.168./g")) {
$var(balance-dst) = "2"; # Internal network
} else {
$var(balance-dst) = "1"; # External network
}
if (!ds_select_dst("$(var(balance-dst){s.int})", "0")) {
xlog("L_ERR", "[BALANCE] No active gateways (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("503", "Service unavailable" );
exit;
}
xlog("L_INFO", "[BALANCE] Balance packet to $dd:$dp (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
return;
}
route[RELAY] {
if ($var(nolog) == 0) xlog("L_INFO", "[RELAY] Request leaving server (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
t_on_reply("ON_REPLY");
t_on_failure("ON_FAIL");
if (!t_relay("0x01")) sl_reply_error();
exit;
}
route[VALIDATE] {
if ($ua =~ "friendly-scanner|sundayddr|sip-scan|iWar|sipsak") {
xlog("L_ERR", "[VALIDATE] Attack attempt - Request dropped (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
drop();
}
if (msg:len > max_len) {
xlog("L_ERR", "[VALIDATE] Message too big - Sending 513 Message Too Big (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("513", "Message Too Big");
exit;
}
if (!mf_process_maxfwd_header("10")) {
xlog("L_ERR", "[VALIDATE] Too many hops - Sending 483 Too Many Hops (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("483", "Too Many Hops");
exit;
}
if(!sipmsg_validate()) {
switch($retcode) {
case -1:
xlog("L_ERR", "[VALIDATE] The message is not RFC3261 compliant - request dropped (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
break;
case -2:
xlog("L_ERR", "[VALIDATE] Parsing error - request dropped (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
break;
case -3:
xlog("L_ERR", "[VALIDATE] Invalid SDP body - request dropped (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
break;
case -4:
xlog("L_ERR", "[VALIDATE] Invalid headers body - request dropped (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
break;
default:
xlog("L_ERR", "[VALIDATE] Undefined error - request dropped (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
}
drop();
}
return;
}
route[PING] {
if (is_method("PING")) {
xlog("L_INFO", "[PING] PING - Sending 200 OK (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("200", "OK");
exit;
}
if (is_method("NOTIFY")) {
if ($hdr(Event) == "keep-alive") {
xlog("L_INFO", "[PING] NOTIFY - Sending 200 OK (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("200", "OK");
exit;
}
}
if ($var(from_proxy) != 1) {
if (is_method("OPTIONS")) {
xlog("L_INFO", "[PING] OPTIONS - Sending 200 OK (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");
send_reply("200", "OK");
exit;
}
}
return;
}
route[FROM_CLASS5] {
if ($si == "192.168.0.3") return(1);
return(-1);
}
route[TO_CLASS5] {
if ($dd == "192.168.0.3") return(1);
return(-1);
}
onreply_route { # GLOBAL
# NAT processing
if (!ds_is_in_list("$si", "$sp")) { # From UAC or commutator
if (!route(FROM_CLASS5)) {
# Fix NATed contact and SDP in client replies
if (nat_uac_test("65")) fix_nated_contact();
if ( (has_body("application/sdp")) && (nat_uac_test("8")) ) fix_nated_sdp("10");
}
}
}
onreply_route[ON_REPLY] {
$var(from_proxy) = ds_is_in_list("$si", "$sp");
$var(nolog) = 0;
if ( ($var(from_proxy) != 1) && (is_method("OPTIONS")) ) $var(nolog) = 1;
if ($var(nolog) == 0) xlog("L_INFO", "[ON_REPLY] Incoming reply (STATUS="$rs $rr" M=$rm IP=$si:$sp F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs)n$mb");
if ($var(from_proxy) != 1) { # From UAC or commutator
# Fix via in OPTIONS ping reply
if ( (is_method("OPTIONS")) && ($fu == "sip:pinger@cool.sip") ) insert_hf("Via: SIP/2.0/UDP $si:$sprn");
} else {
if (!route(TO_CLASS5)) {
if (is_method("MESSAGE")) remove_hf("Record-Route");
}
}
}
failure_route[ON_FAIL] {
if (!t_check_status("408")) return;
if (ds_is_in_list("$si", "$sp")) return; # OPTIONS pings
xlog("L_ERR", "[ON_FAIL] Timeout for current gateway (M=$rm IP=$si:$sp F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs)n");
ds_mark_dst("p"); # Set gw to probing state
if (ds_next_dst()) {
xlog("L_INFO", "[ON_FAIL] Next gateway is $dd:$dp (M=$rm IP=$si:$sp F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs)n");
cache_store("local", "proxy;$ci", "$du");
route(RELAY);
} else {
xlog("L_ERR", "[ON_FAIL] No more gateways (M=$rm IP=$si:$sp F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs)n");
send_reply("503", "Service unavailable" );
exit;
}
}
local_route {
if (is_method("OPTIONS")) return;
if (is_present_hf("x-orig-ci")) { # Packet from B2B
xlog("L_INFO", "[LOCAL] B2B generated request (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n$mbn");
if ($hdr(x-orig-to)) { # Restore original To header after b2b
remove_hf("To");
append_hf("To: $hdr(x-orig-to)rn", "From");
remove_hf("x-orig-to");
}
if (route(TO_CLASS5)) {
remove_hf("Authorization");
remove_hf("Proxy-Authorization");
}
if (!ds_is_in_list("$dd", "$dp")) { # Not to proxy
remove_hf("x-orig-ci");
remove_hf("x-src-uri");
}
xlog("L_INFO", "[LOCAL] Request leaving server (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci origID=$hdr(Init-CallID) cseq=$cs UA=$ua)n");
} else {
xlog("L_INFO", "[LOCAL] Generated request (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n$mbn");
xlog("L_INFO", "[LOCAL] Request leaving server (M=$rm IP=$si:$sp RURI=$ru DURI=$du F=$fu T=$tu oP=$oP pr=$pr dP=$dP rP=$rP ID=$ci cseq=$cs UA=$ua)n");