使用logstash将MySQL数据同步到elasticsearch

安装logstash

logstash下载地址:https://www.elastic.co/downloads/logstash,这里只显示了最新版本,历史版本地址:https://www.elastic.co/downloads/past-releases#logstash

下载好解压后简单配置下logstash.conf(解压后在config下有一个logstash-sample.conf文件,复制为logstash.conf,其他命名也行)就可以使用了。如果你的版本是7.6(不包括)以下,还需要安装logstash-integration-jdbc插件,7.6以上该插件已经集成到logstash中,无需额外安装。

配置

输入配置

默认配置文件打开,在input里有个beats的配置删掉,插入jdbc配置

jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://数据库地址:3306/数据库名字"
    jdbc_user => "数据库账号"
    jdbc_password => "数据库密码"
    schedule => "* * * * *这里和Linux的crontab配置一样"
    statement => "查询出要同步到elasticsearch的数据"
    use_column_value => true
    tracking_column => "update_time"
    tracking_column_type => "timestamp"
  }
  • use_column_value

值为布尔值,这里影响sql_last_value的值,默认为falsesql_last_value使用上次执行时间,初始值为1970-01-01零点。设置为true,那么这里是tracking_column配置字段的值,如tracking_column => "id",那么这里就是上次执行的id值。

  • tracking_column

数据库字段,sql判断条件。如主键、更新时间等。

  • tracking_column_type

上面字段值的类型,有两个值可选:timestampnumeric。默认为numeric。影响sql_last_value,如上面字段update_time假设为时间戳,但是这里设置为timestamp,那么sql_last_value初始值为1970-01-01,这就会使sql_last_value一直停留在初始值。设置为numeric,初始值为0

输出配置

默认配置:

elasticsearch {
   hosts => ["http://localhost:9200"]
   index => ""
   document_id => "%{elasticsearch使用哪个字段作主键ID}"
   #user => "elastic"
   #password => "changeme"
 }

hosts需要改为自己的elasticsearch地址。虽然,参数document_type参数还存在,但是默认是会将数据同步到_doc下面。从elastcisearch的6.0开始便不再推荐使用type,7.0以上的版本已经废弃,而8.0版本会直接移除。详细说明:removal of mapping types

这样已经能满足数据同步了,如有其它需求可根据参数说明文档作调整。

  • 启动

启动logstash只需进入到bin目录,执行:

./logstash -f /配置文件路径/xxx.conf

logstash配置

logstash配置分为三个部分:input(输入)、filter(过滤)、output(输出)。

  • jdbc中last_run_metadata_path设置的路径需有读写权限。如果没有写入权限,变量sql_last_value虽然会记录上一次的值,但在重新启动后,值会从0开始。

  • 启动时提示:

WARN: Establishing SSL connection without server’s identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn’t set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to ‘false’. You need eit
her to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

在jdbc_connection_string配置后面加上参数:useSSL=false&verifyServerCertificate=false即可。

  • MySQL中tinyint(1)字段值jdbc读取后会被转换为布尔值。同样在jdbc_connection_string后面加上参数:tinyInt1isBit=false&transformedBitIsBoolean=false可以保持原值不被转换。

使用Zephir写PHP扩展

PHPc语言写的,如果要编写PHP扩展,我们需要理解PHP源码,需要使用c语言来编写扩展程序,这样步骤繁琐,入门门槛高。有了Zephir之后,编写PHP扩展,就和编写PHP代码一样简单。

安装Zephir

  • 安装Zephir parser

准备需要用到的东西:

sudo yum install php-devel gcc make re2c autoconf automake

得到下面的结果:

[root@centos8 zeno]# sudo yum install php-devel gcc make re2c autoconf automake
Last metadata expiration check: 1:03:17 ago on Fri 31 Jul 2020 04:20:32 PM CST.
Package gcc-8.3.1-5.el8.0.2.x86_64 is already installed.
Package make-1:4.2.1-10.el8.x86_64 is already installed.
No match for argument: re2c
Package autoconf-2.69-27.el8.noarch is already installed.
Package automake-1.16.1-6.el8.noarch is already installed.
Error: Unable to find a match: re2c

re2c没有找到,那自己下载安装:

wget https://github.com/skvadrik/re2c/archive/2.0.1.tar.gz

本地有一个2.0.1.tar.gz的文件,解压re2c并进入目录:

tar xzf 2.0.1.tar.gz
cd re2c-2.0.1/

得到文件:

[zeno@centos8 re2c-2.0.1]$ ls
add-release.txt benchmarks build cmake configure.ac examples include libre2c_old
Makefile.am NO_WARRANTY release.sh sf-cheatsheet test
autogen.sh bootstrap CHANGELOG CMakeLists.txt doc fuzz lib LICENSE >Makefile.lib.am README.md run_tests.sh.in src

使用autoreconf命令:

autoreconf -i -W all

再次查看目录文件:

[zeno@centos8 re2c-2.0.1]$ ls
aclocal.m4 autogen.sh benchmarks build CHANGELOG CMakeLists.txt configure doc >fuzz lib LICENSE Makefile.am Makefile.lib.am README.md run_tests.sh.in src
add-release.txt autom4te.cache bootstrap build-aux cmake config.h.in configure.ac examples
include libre2c_old m4 Makefile.in NO_WARRANTY release.sh sf-cheatsheet test

可以看到有个configuer文件。安装:

./configure && sudo make && sudo make install

现在需要的东西都已准备好了。安装Zephir-parser:

git clone git://github.com/phalcon/php-zephir-parser.git
cd php-zephir-parser
phpize
./configure --with-php-config=/usr/local/php/bin/php-config
make
sudo make install

php.ini中开启扩展:

extension=zephir_parser.so

接下来就是安装zephir了。这一步十分简单,只需从https://github.com/phalcon/zephir/releases/latest下载zephir.phar文件,然后将其移入/usr/bin/或者/usr/local/bin目录下,再加上可执行权限:

mv zephir.phar /usr/bin/zephir
chmod +x /usr/bin/zephir

zephir便已经安装上了。测试一下,可以在输入命令zephirzephir help,如果出现zephir的使用介绍便证明安装成功了。

扩展需求及初始化

在开发api的过程中,经常我们会遇到需要显示xx秒前xx天前这样的格式,现在就写一个扩展,将传入进来的时间戳转换为对应的格式后返回。

首先,初始化项目:

// 这里zephir会自动将大写转为小写,所以直接用小写
zephir init timehelper
cd timehelper/timehelper

在这个目录下建一个文件:helper.zep,先写一个测试方法输出hello world

namespace TimeHelper;

class Helper {

    public static function say(){
        echo "hello world!";
    }
}

然后使用zephir build来编译安装扩展程序,如果编译安装成功会得到提示:

Preparing for PHP compilation...
Preparing configuration file...
Compiling...
Installing...

Extension installed.
Add "extension=timehelper.so" to your php.ini

! [NOTE] Don't forget to restart your web server

现在,只需在php.ini文件里加上extension=timehelper.so就能使用扩展了。

测试扩展程序

刚刚安装了timehelper这个扩展程序,要怎么使用呢?新建一个PHP文件,加入下面一行代码:

<?php
TimeHelper\Helper::say();

保存为test.php后使用PHP执行:php test.php,如果输出了hello world,那就成功了。

编写扩展


namespace TimeHelper;

class Helper {

    public static function say(){
        echo "hello world!";
    }
    
    public function before(int! sec)-> string|bool {
        if sec > time() || sec < 0 {
            return false;
        }
        
        int diff;
        let diff = time() - sec;
        if diff <= 60 {
            return diff . "秒前";
        }

        if diff > 60 && diff < 3600 {
            return this->getMin(diff);
        }

        if diff >= 3600 && diff < 86400 {
            return this->getHour(diff);
        }

        if diff >= 86400 && diff < 2592000 {
            return this->getDay(diff);
        }

        if diff >= 2592000 && diff < 2592000 * 12 {
            return this->getMonth(diff);
        }
        
        return this->getYear(diff);
    }

    private function getMin(int! sec)-> string{
            if sec === 0 {
                return "";
            }

            int min = intval(sec / 60);
            let sec = sec % 60;
            if sec > 0 {
                return min . "分" . sec . "秒前";
            }

            return min . "分前";
    }

    private function getHour(int! sec)-> string {
            int hour = intval(sec / 3600);
            int diff = sec % 3600;
            var min = this->getMin(diff);

            if strlen(min) {
                return hour . "小时" . min;
            }

            return hour . "小时前";
    }

    private function getDay(int! sec)->string {
            int day = intval(sec / 86400);

            return day . "天前";
    }

    private function getMonth(int! sec)-> string {
            int month = intval(sec / 2592000);

            return month . "月前";
    }

    private function getYear(int! sec) -> string {
            int year = intval(sec / 2592000 / 365);

            return year . "年前";
    }
}
  • PHP测试程序

修改刚刚的test.php为下:

<?php
$helper = new TimeHelper\Helper();

// xx秒前
$sec = time() - 50 ;
echo $helper->before($sec), "\n";

// xx分xx秒前
$sec = time() - 73;
echo $helper->before($sec), "\n";

// xx小时前
$sec = time() - 3950;
echo $helper->before($sec), "\n";

// xx天前
$sec = time() - 86600;
echo $helper->before($sec), "\n";

// xx月前
$sec = time() - 2597000;
echo $helper->before($sec), "\n";

// xx年前
$sec = time() - 948672000;
echo $helper->before($sec), "\n";

在使用PHP执行该文件后可以看到输出:

50秒前
1分13秒前
1小时5分50秒前
1天前
1月前
1年前

后记

  • 在zephir的代码里,可以直接使用PHP的内置函数,如上面用到time,intval
  • zephir里返回值可以多个,用|分隔
  • zephir里参数可以不指定类型:
public function say(str){}

也可以指定(直接写类型,不加感叹号):

public function say(string str){}

还可以强制指定类型(加感叹号在类型后面):

public function say(string! str){}

后面两种差别是前者传入类型不一致会作类型转换,转换不成功会报错,后者不会作类型转换,类型必须一致。

  • 之前我是在Windows的虚拟机里使用,代码写在Linux和Windows的共享目录里,在Linux终端运行zephir build编译失败,因为底层gcc编译会当做跨平台编译,需要传入–host参数,但是zephir命令还不支持传参给gcc,所有把代码移出共享目录,放Linux下任意目录编译成功。

PHP多个CURL请求异步执行

先看PHP简单的curl请求:

$ch = curl_init('http://example.com');
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_exec($ch);
curl_close($ch);

如果有多个请求,那么需要多次重复上面的操作:

$ch = curl_init('http://example1.com');
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_exec($ch);
curl_close($ch);

$ch = curl_init('http://example2.com');
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_exec($ch);
curl_close($ch);

// 更多的请求...

这样,所需时间是每个请求消耗的时间之和.如果有5个请求,每个请求耗时1秒,那么需要5秒的时间.换作异步执行,所需耗时是最长时间的那个请求的时间,那么久只需要1秒钟完成5个请求.

// 同样先初始化curl
$ch1 = curl_init('http://example1.com');
curl_setopt($ch1, CURLOPT_HEADER, 0);
curl_setopt($ch1, CURLOPT_RETURNTRANSFER, 1);
// 这里不再使用curl_exec
$ch2 = curl_init('http://example2.com');
curl_setopt($ch2, CURLOPT_HEADER, 0);
curl_setopt($ch2, CURLOPT_RETURNTRANSFER, 1);
// 初始化异步多请求
$mh = curl_multi_init();
// 添加前面的每个handle
curl_multi_add_handle($mh, $ch1);
curl_multi_add_handle($mh, $ch2);

// 执行请求
$active = null;
do {
    $status = curl_multi_exec($mh, $active);
}while($status === CURLM_CALL_MULTI_PERFORM);
while ($active && $status == CURLM_OK){
    // 即使未还有未执行完的请求这里也可能是-1
    // 参考: https://www.php.net/manual/en/function.curl-multi-select.php#115381
    if(curl_multi_select($mh) === -1){
        usleep(100);
    }
        
    do {
        $status = curl_multi_exec($mh, $active);
    }while ($status === CURLM_CALL_MULTI_PERFORM);
}

// 如果需要返回结果
$res[] = curl_multi_getcontent($ch1);
$res[] = curl_multi_getcontent($ch2);

// 移除handle
curl_multi_remove_handle($ch1);
curl_multi_remove_handle($ch2);

// 关闭
curl_multi_close($mh);

在具体业务中,可能就是以数组的形式传入所需请求的URL,参数等.我们可以使用循环来操作.

Elasticsearch问题收集

  • No alive nodes found in your cluster

在客户端出现这个问题,检查Elasticsearch是不是死掉了。如使用:

ps -aux | grep elasticsearch

// 得到:
vagrant  13508  0.0  0.0  14228   928 pts/0    S+   19:10   0:00 grep --color=auto elasitcsearch

那么,重新启动服务。另外,以daemon方式启动,如果出错是看不到错误的。只显示这一行:

OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
  • mapper [title] of different type, current_type [keyword], merged_type [text]

完整的报错类似于这样的:

Array
(
    [error] => Array
        (
            [root_cause] => Array
                (
                    [0] => Array
                        (
                            [type] => illegal_argument_exception
                            [reason] => mapper [title] of different type, current_type [keyword], merged_type [text]
                        )

                )

            [type] => illegal_argument_exception
            [reason] => mapper [title] of different type, current_type [keyword], merged_type [text]
        )

    [status] => 400
)

这个是怎么出现的呢?就是先已经将title设定为keyword类型,然后又去设定titletext类型,就造成了这个问题。那么,怎么办呢?在这里有解释:修改已经存在的字段会使之前已经建立索引的数据无效。但遇到之前建立错了,必须要改,怎么办呢?只能使用新的mappings重新建立一个新的index,然后使用reindex接口去对数据建立索引。

  • master not discovered yet, this node has not previously joined a boots and [cluster.initial_master_nodes] is empty on this node

首先,这个问题发生在我阅读到path.data配置时产生的。在此之前,运行正常,其他没有任何配置改动。那么,可以判断是这个配置造成的,后将其注释,再次启动成功。所以,为什么在配置path.data后就失败了呢?从这句话来看,需要设置cluster.initial_master_nodes的值。我这里是本地就一个单机,先尝试像discovery.seed_hosts: []一样,给个空试试,依然在报这个错误。那先将Node部分的node.name注释去掉,使用默认的node-1,然后将discoverycluster.initial_master_nodes注释也去掉,并给值node-1,可以启动成功了。

Native controller process has stopped - no new native processes can be started

昨天安装好,今天启动Elasticsearch时失败:

...
ERROR: [1] bootstrap checks failed
[1]: max number of threads [2048] for user [vagrant] is too low, increase to at least [4096]
[2]: the default discovery settings are unsuitable for production use; at least one of [discovery.seed_hosts, discovery.seed_providers, cluster.initial_master_nodes] must be configured
[2019-11-15T14:46:51,596][INFO ][o.e.n.Node               ] [machine2] stopping ...
[2019-11-15T14:46:51,640][INFO ][o.e.n.Node               ] [machine2] stopped
[2019-11-15T14:46:51,640][INFO ][o.e.n.Node               ] [machine2] closing ...
[2019-11-15T14:46:51,666][INFO ][o.e.n.Node               ] [machine2] closed
[2019-11-15T14:46:51,669][INFO ][o.e.x.m.p.NativeController] [machine2] Native controller process has stopped - no new native processes can be started

遇到这样的错误,不能看最后一句话。看前面的error,这里有两个错误。

第一个解决方案可以看这里给出了两种方法:

  1. 使用Linux命令ulimit -u xxxx设置。
  2. 修改/etc/security/limits.conf中对应当前用户的nproc

额外说下,nproc代表max number of processes。这在文件前面有解释

第二个问题看这里,可以知道,没有做任何网络配置,Elasticsearch会绑定在loopback address(127.0.0.1)上,并扫描93009305的端口去尝试连接其他节点(这样做是方便了集群)。出现这个问题是因为昨天我将IP地址修改了。所以,解决这个问题可以:

  1. 将昨天的改动去掉,网络配置的IP地址恢复默认配置
  2. $ES_HOME/config/elasticsearch.ymlDiscovery模块中配置discovery.seed_hosts: []

Elasticsearch安装

官文地址:在这里

  1. 下载
vagrant@machine2:~$ curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.4.2-linux-x86_64.tar.gz
  1. 解压
vagrant@machine2:~$ tar xzf elasticsearch-7.4.2-linux-x86_64.tar.gz
  1. 启动
vagrant@machine2:~$ cd elasticsearch-7.4.2/bin/
vagrant@machine2:~/elasticsearch-7.4.2/bin$ ./elasticsearch

出现问题:

could not find java in JAVA_HOME or bundled at /usr/lib/jmv/java-8-openjdk-amd64/bin/java

查看环境变量:

echo $JAVA_HOME
// 得到:
/usr/lib/jmv/java-8-openjdk-amd64

需要注意文档这段话:

Elasticsearch includes a bundled version of OpenJDK from the JDK maintainers (GPLv2+CE). To use your own version of Java, see the JVM version requirements

也就是说自带了jdk,而我自己之前有安装jdk,需要另外配置。通过链接查看额外配置文档

Elasticsearch is built using Java, and includes a bundled version of OpenJDK from the JDK maintainers (GPLv2+CE) within each distribution. The bundled JVM is the recommended JVM and is located within the jdk directory of the Elasticsearch home directory.

To use your own version of Java, set the JAVA_HOME environment variable. If you must use a version of Java that is different from the bundled JVM, we recommend using a supported LTS version of Java. Elasticsearch will refuse to start if a known-bad version of Java is used. The bundled JVM directory may be removed when using your own JVM.

还是推荐使用自带的jdk,没有细说。也在目录下面看到了jdk目录,那重新设置$JAVA_HOME环境变量到Elasticsearch里面的jdk目录:

export JAVA_HOME=/home/vagrant/elasticsearch-7.4.2/jdk

再次启动,显示了一大堆启动的东西,已经成功了,下面来验证是否启动:

curl 127.0.0.1:9200

// 会得到:
{
  "name" : "iZ23pskgys8Z",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "J2SyY5cZRayLwC1lsPaoGA",
  "version" : {
    "number" : "7.4.2",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "2f90bbf7b93631e52bafb59b3b049cb44ec25e96",
    "build_date" : "2019-10-28T20:40:44.881551Z",
    "build_snapshot" : false,
    "lucene_version" : "8.2.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

得到上面的回应,就说明安装成功了。

  • vagrant 环境下安装

我本地是vagrant+virtualbox搭建的环境。在安装成功后,修改elasticsearch.ymlnetworkIP绑定在vagrantIP上,如我的是192.168.56.102,修改55行:

network.host: 192.168.56.102

修改68行:

discovery.seed_hosts:[]

这样,我在windows的浏览器里访问http://192.168.56.102:9200同样成功了。

Starting NFS mountd: rpc.mountd: svc_tli_create: could not open connection for udp6

在安装好nfs后,重启服务:

service nfs restart1

得到下面报错:

Starting kernel based NFS server: idmapd mountdrpc.mountd: svc_tli_create:
could not open connection for udp6
rpc.mountd: svc_tli_create: could not open connection for tcp6
rpc.mountd: svc_tli_create: could not open connection for udp6
rpc.mountd: svc_tli_create: could not open connection for tcp6
rpc.mountd: svc_tli_create: could not open connection for udp6
rpc.mountd: svc_tli_create: could not open connection for tcp6
statd nfsdrpc.nfsd: address family inet6 not supported by protocol TCP
sm-notify
done

udp6是用于IPV6的,所以这里可以在/etc/netconfig中IPV6的注释掉:

udp        tpi_clts      v     inet     udp     -       -
tcp        tpi_cots_ord  v     inet     tcp     -       -
#udp6       tpi_clts      v     inet6    udp     -       -
#tcp6       tpi_cots_ord  v     inet6    tcp     -       -
rawip      tpi_raw       -     inet      -      -       -
local      tpi_cots_ord  -     loopback  -      -       -
unix       tpi_cots_ord  -     loopback  -      -       -

小程序网络请求header头设置Content-Type

小程序发起http网络请求,通过wx.request函数的调用。该函数接收一个对象,可以在对象里配置包括地址headermethod等。method默认为post请求。但是,我们在服务器端使用$_POST或框架如Phalcon里的$this->request->getPost()等获取参数时,取得结果为空。但是通过流获取:

$params = file_get_contents('php://input')

Phalcon里通过:

$params = $this->request->getJsonRawBody();

这样是能够得到参数的。为什么POST请求,却不能在POST里得到呢?我们注意看文档,会发现此时的headercontent-typeapplication/json(默认方式),再查看PHP的文档,可以看到:

当 HTTP POST 请求的 Content-Type 是 application/x-www-form-urlencoded 或 multipart/form-data 时,会将变量以关联数组形式传入当前脚本。

也就是说,我们需要重新设置小程序的header头里的content-typeapplication/x-www-form-urlencoded,就可以获取POST里的参数了。