??xml version="1.0" encoding="utf-8" standalone="yes"?>BlogJava-SIMONEhttp://www.dentisthealthcenter.com/wangxinsh55/zh-cnThu, 07 Dec 2023 16:14:04 GMTThu, 07 Dec 2023 16:14:04 GMT60nodejs对象{换成字符串代?动态执行字W串代码,requirejs使用r.js打包时动态生成配|文?/title>http://www.dentisthealthcenter.com/wangxinsh55/archive/2016/11/01/431944.htmlSIMONESIMONETue, 01 Nov 2016 08:24:00 GMThttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/11/01/431944.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/comments/431944.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/11/01/431944.html#Feedback0http://www.dentisthealthcenter.com/wangxinsh55/comments/commentRss/431944.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/services/trackbacks/431944.htmlvar path = require('path'); var fs = require('fs'); var vm = require('vm'); var os = require('os');
/**
* 格式化羃q的个数 */ function toIndent(indent) { var s = []; for (var i =0; i < indent; i++) {
s.push('\t');
} return s.join('');
}
/**
* 数l对象{换成原始字符?br />
*/ function array2string(arr, indent) { var s = ['[', os.EOL], hasProp =false; for (var i =0; i < arr.length; i++) { if (!hasProp) {
hasProp =true;
}
s.push(toIndent(indent +1));
var item = arr[i]; var itemtp =typeof(item); if (itemtp === 'object') { if (item instanceof Array) {
s.push(array2string(item, indent +1));
} else {
s.splice(s.length -2, 2);
s.push(object2strng(item, indent).trim());
}
} else {
s.push(JSON.stringify(item));
}
s.push(',');
s.push(os.EOL);
} if (hasProp) {
s.splice(s.length -2, 1);
}
s.push(toIndent(indent));
s.push(']'); return s.join('');
}
/**
* 对象{换成原始字符?br />
*/ function object2strng(obj, indent) { var s = ['{', os.EOL], hasProp =false;
for (var o in obj) { if (!hasProp) {
hasProp =true;
}
s.push(toIndent(indent +1));
s.push(JSON.stringify(o));
s.push(':');
]]>ubuntu mate 下的sublime text 3调用中文输入法的修改http://www.dentisthealthcenter.com/wangxinsh55/archive/2016/08/19/431643.htmlSIMONESIMONEFri, 19 Aug 2016 09:53:00 GMThttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/08/19/431643.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/comments/431643.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/08/19/431643.html#Feedback0http://www.dentisthealthcenter.com/wangxinsh55/comments/commentRss/431643.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/services/trackbacks/431643.html
关于几个重要配置的说明 + [realms].kdc : the name of the host running a KDC for that realm. + [realms].admin_server : identifies the host where the administration server is running. Typically this is the Master Kerberos server. + [domain_realm] : provides a translation from a hostname to the Kerberos realm name for the service provided by that host.
Kerberos is a network authentication system based on the principal of a trusted third party. The other two parties being the user and the service the user wishes to authenticate to. Not all services and applications can use Kerberos, but for those that can, it brings the network environment one step closer to being Single Sign On (SSO).
This section covers installation and configuration of a Kerberos server, and some example client configurations.
Overview
If you are new to Kerberos there are a few terms that are good to understand before setting up a Kerberos server. Most of the terms will relate to things you may be familiar with in other environments:
Principal: any users, computers, and services provided by servers need to be defined as Kerberos Principals.
Instances: are used for service principals and special administrative principals.
Realms: the unique realm of control provided by the Kerberos installation. Usually the DNS domain converted to uppercase (EXAMPLE.COM).
Key Distribution Center: (KDC) consist of three parts, a database of all principals, the authentication server, and the ticket granting server. For each realm there must be at least one KDC.
Ticket Granting Ticket: issued by the Authentication Server (AS), the Ticket Granting Ticket (TGT) is encrypted in the user's password which is known only to the user and the KDC.
Ticket Granting Server: (TGS) issues service tickets to clients upon request.
Tickets: confirm the identity of the two principals. One principal being a user and the other a service requested by the user. Tickets establish an encryption key used for secure communication during the authenticated session.
Keytab Files: are files extracted from the KDC principal database and contain the encryption key for a service or host.
To put the pieces together, a Realm has at least one KDC, preferably two for redundancy, which contains a database of Principals. When a user principal logs into a workstation, configured for Kerberos authentication, the KDC issues a Ticket Granting Ticket (TGT). If the user supplied credentials match, the user is authenticated and can then request tickets for Kerberized services from the Ticket Granting Server (TGS). The service tickets allow the user to authenticate to the service without entering another username and password.
Kerberos Server
Installation
Before installing the Kerberos server a properly configured DNS server is needed for your domain. Since the Kerberos Realm by convention matches the domain name, this section uses the example.com domain configured in the section called “Primary Master”.
Also, Kerberos is a time sensitive protocol. So if the local system time between a client machine and the server differs by more than five minutes (by default), the workstation will not be able to authenticate. To correct the problem all hosts should have their time synchronized using the Network Time Protocol (NTP). For details on setting up NTP see the section called “Time Synchronisation with NTP”.
The first step in installing a Kerberos Realm is to install the krb5-kdc and krb5-admin-server packages. From a terminal enter:
sudo apt-get install krb5-kdc krb5-admin-server
You will be asked at the end of the install to supply a name for the Kerberos and Admin servers, which may or may not be the same server, for the realm.
Next, create the new realm with the kdb5_newrealm utility:
sudo krb5_newrealm
Configuration
The questions asked during installation are used to configure the /etc/krb5.conf file. If you need to adjust the Key Distribution Center (KDC) settings simply edit the file and restart the krb5-kdc daemon.
Now that the KDC running an admin user is needed. It is recommended to use a different username from your everyday username. Using the kadmin.local utility in a terminal prompt enter:
sudo kadmin.localAuthenticating as principal root/admin@EXAMPLE.COM with password. kadmin.local: addprinc steve/adminWARNING: no policy specified for steve/admin@EXAMPLE.COM; defaulting to no policy Enter password for principal "steve/admin@EXAMPLE.COM": Re-enter password for principal "steve/admin@EXAMPLE.COM": Principal "steve/admin@EXAMPLE.COM" created. kadmin.local: quit
In the above example steve is the Principal, /admin is an Instance, and @EXAMPLE.COM signifies the realm. The "every day" Principal would be steve@EXAMPLE.COM, and should have only normal user rights.
Replace EXAMPLE.COM and steve with your Realm and admin username.
Next, the new admin user needs to have the appropriate Access Control List (ACL) permissions. The permissions are configured in the /etc/krb5kdc/kadm5.acl file:
steve/admin@EXAMPLE.COM *
This entry grants steve/admin the ability to perform any operation on all principals in the realm.
Now restart the krb5-admin-server for the new ACL to take affect:
sudo /etc/init.d/krb5-admin-server restart
The new user principal can be tested using the kinit utility:
There should now be a keytab.kdc02 in the current directory, move the file to /etc/krb5.keytab:
sudo mv keytab.kdc02 /etc/krb5.keytab
If the path to the keytab.kdc02 file is different adjust accordingly.
Also, you can list the principals in a Keytab file, which can be useful when troubleshooting, using the klist utility:
sudo klist -k /etc/krb5.keytab
Next, there needs to be a kpropd.acl file on each KDC that lists all KDCs for the Realm. For example, on both primary and secondary KDC, create /etc/krb5kdc/kpropd.acl:
There should be a SUCCEEDED message if the propagation worked. If there is an error message check /var/log/syslog on the secondary KDC for more information.
You may also want to create a cron job to periodically update the database on the Secondary KDC. For example, the following will push the database every hour:
# m h dom mon dow command 0 * * * * /usr/sbin/kdb5_util dump /var/lib/krb5kdc/dump && /usr/sbin/kprop -r EXAMPLE.COM -f /var/lib/krb5kdc/dump kdc02.example.com
Back on the Secondary KDC, create a stash file to hold the Kerberos master key:
sudo kdb5_util stash
Finally, start the krb5-kdc daemon on the Secondary KDC:
sudo /etc/init.d/krb5-kdc start
The Secondary KDC should now be able to issue tickets for the Realm. You can test this by stopping the krb5-kdc daemon on the Primary KDC, then use kinit to request a ticket. If all goes well you should receive a ticket from the Secondary KDC.
Kerberos Linux Client
This section covers configuring a Linux system as a Kerberos client. This will allow access to any kerberized services once a user has successfully logged into the system.
Installation
In order to authenticate to a Kerberos Realm, the krb5-user and libpam-krb5 packages are needed, along with a few others that are not strictly necessary but make life easier. To install the packages enter the following in a terminal prompt:
The auth-client-config package allows simple configuration of PAM for authentication from multiple sources, and the libpam-ccreds will cache authentication credentials allowing you to login in case the Key Distribution Center (KDC) is unavailable. This package is also useful for laptops that may authenticate using Kerberos while on the corporate network, but will need to be accessed off the network as well.
Configuration
To configure the client in a terminal enter:
sudo dpkg-reconfigure krb5-config
You will then be prompted to enter the name of the Kerberos Realm. Also, if you don't have DNS configured with Kerberos SRV records, the menu will prompt you for the hostname of the Key Distribution Center (KDC) and Realm Administration server.
The dpkg-reconfigure adds entries to the /etc/krb5.conf file for your Realm. You should have entries similar to the following:
I have a need to create a server farm that can handle 5+ million connections, 5+ million topics (one per client), process 300k messages/sec.
I tried to see what various message brokers were capable so I am currently using two RHEL EC2 instances (r3.4xlarge) to make lots of available resources. So you do not need to look it up, it has 16vCPU, 122GB RAM. I am nowhere near that limit in usage.
I am unable to pass the 600k connections limit. Since there doesn't seem to be any O/S limitation (plenty of RAM/CPU/etc.) on either the client nor the server what is limiting me?
I have edited /etc/security/limits.conf as follows:
* soft nofile 20000000 * hard nofile 20000000 * soft nproc 20000000 * hard nproc 20000000 root soft nofile 20000000 root hard nofile 20000000
Can't really tell how to increase the throughput. However, checkout kafka.apache.org . Not sure about the MQTT support, but it seems capable of extrem throughput / # clients. – Petter NordlanderMar 31 '15 at 7:52
]]>Spark History Server配置使用http://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430665.htmlSIMONESIMONEThu, 26 May 2016 06:12:00 GMThttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430665.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/comments/430665.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430665.html#Feedback0http://www.dentisthealthcenter.com/wangxinsh55/comments/commentRss/430665.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/services/trackbacks/430665.htmlhttp://www.cnblogs.com/luogankun/p/3981645.html
starting org.apache.spark.deploy.history.HistoryServer, logging to /home/spark/software/source/compile/deploy_spark/sbin/../logs/spark-spark-org.apache.spark.deploy.history.HistoryServer-1-hadoop000.out failed to launch org.apache.spark.deploy.history.HistoryServer: at org.apache.spark.deploy.history.FsHistoryProvider.<init>(FsHistoryProvider.scala:44) ... 6more
The History Server will list all applications. It will just retain a max number of them in memory. That option does not control how many applications are show, it controls how much memory the HS will need.
]]>Spark On Yarn中spark.yarn.jar属性的使用http://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430664.htmlSIMONESIMONEThu, 26 May 2016 06:11:00 GMThttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430664.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/comments/430664.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430664.html#Feedback0http://www.dentisthealthcenter.com/wangxinsh55/comments/commentRss/430664.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/services/trackbacks/430664.htmlhttp://www.cnblogs.com/luogankun/p/4191796.html
今天在测试spark-sqlq行在yarn上的q程中,无意间从日志中发C一个问题:
spark-sql --master yarn
14/12/2915:23:17 INFO Client: Requesting a new application from cluster with 1 NodeManagers 14/12/2915:23:17 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 14/12/2915:23:17 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 14/12/2915:23:17 INFO Client: Setting up container launch context for our AM 14/12/2915:23:17 INFO Client: Preparing resources for our AM container 14/12/2915:23:17 INFO Client: Uploading resource file:/home/spark/software/source/compile/deploy_spark/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.0.0.jar -> hdfs://hadoop000:8020/user/spark/.sparkStaging/application_1416381870014_0093/spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.0.0.jar14/12/2915:23:18 INFO Client: Setting up the launch environment for our AM container
再开启一个spark-sql命o行,从日志中再次发现Q?/p>
14/12/2915:24:03 INFO Client: Requesting a new application from cluster with 1 NodeManagers 14/12/2915:24:03 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 14/12/2915:24:03 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 14/12/2915:24:03 INFO Client: Setting up container launch context for our AM 14/12/2915:24:03 INFO Client: Preparing resources for our AM container 14/12/2915:24:03 INFO Client: Uploading resource file:/home/spark/software/source/compile/deploy_spark/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.0.0.jar -> hdfs://hadoop000:8020/user/spark/.sparkStaging/application_1416381870014_0094/spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.0.0.jar14/12/2915:24:05 INFO Client: Setting up the launch environment for our AM container
14/12/29 15:39:02 INFO Client: Requesting a new application from cluster with 1 NodeManagers 14/12/29 15:39:02 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 14/12/29 15:39:02 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 14/12/29 15:39:02 INFO Client: Setting up container launch context for our AM 14/12/29 15:39:02 INFO Client: Preparing resources for our AM container 14/12/29 15:39:02 INFO Client: Source and destination file systems are the same. Not copying hdfs://hadoop000:8020/spark_lib/spark-assembly-1.3.0-SNAPSHOT-hadoop2.3.0-cdh5.0.0.jar 14/12/29 15:39:02 INFO Client: Setting up the launch environment for our AM container
]]>Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines)http://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430663.htmlSIMONESIMONEThu, 26 May 2016 05:53:00 GMThttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430663.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/comments/430663.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/archive/2016/05/26/430663.html#Feedback0http://www.dentisthealthcenter.com/wangxinsh55/comments/commentRss/430663.htmlhttp://www.dentisthealthcenter.com/wangxinsh55/services/trackbacks/430663.htmlhttps://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
wrote a blog post about how LinkedIn uses Apache Kafka as a central publish-subscribe log for integrating data between applications, stream processing, and Hadoop data ingestion.
To actually make this work, though, this "universal log" has to be a cheap abstraction. If you want to use a system as a central data hub it has to be fast, predictable, and easy to scale so you can dump all your data onto it. My experience has been that systems that are fragile or expensive inevitably develop a wall of protective process to prevent people from using them; a system that scales easily often ends up as a key architectural building block just because using it is the easiest way to get things built.
I've always liked the benchmarks of Cassandra that show it doing a million writes per second on three hundred machines onEC2 and Google Compute Engine. I'm not sure why, maybe it is a Dr. Evil thing, but doing a million of anything per second is fun.
In any case, one of the nice things about a Kafka log is that, as we'll see, it is cheap. A million writes per second isn't a particularly big thing. This is because a log is a much simpler thing than a database or key-value store. Indeed our production clusters take tens of millions of reads and writes per second all day long and they do so on pretty modest hardware.
But let's do some benchmarking and take a look.
Kafka in 30 seconds
To help understand the benchmark, let me give a quick review of what Kafka is and a few details about how it works. Kafka is a distributed messaging system originally built at LinkedIn and now part of the Apache Software Foundation and used by a variety of companies.
The general setup is quite simple. Producers send records to the cluster which holds on to these records and hands them out to consumers:
The key abstraction in Kafka is the topic. Producers publish their records to a topic, and consumers subscribe to one or more topics. A Kafka topic is just a sharded write-ahead log. Producers append records to these logs and consumers subscribe to changes. Each record is a key/value pair. The key is used for assigning the record to a log partition (unless the publisher specifies the partition directly).
Here is a simple example of a single producer and consumer reading and writing from a two-partition topic.
This picture shows a producer process appending to the logs for the two partitions, and a consumer reading from the same logs. Each record in the log has an associated entry number that we call the offset. This offset is used by the consumer to describe it's position in each of the logs.
These partitions are spread across a cluster of machines, allowing a topic to hold more data than can fit on any one machine.
Note that unlike most messaging systems the log is always persistent. Messages are immediately written to the filesystem when they are received. Messages are not deleted when they are read but retained with some configurable SLA (say a few days or a week). This allows usage in situations where the consumer of data may need to reload data. It also makes it possible to support space-efficient publish-subscribe as there is a single shared log no matter how many consumers; in traditional messaging systems there is usually a queue per consumer, so adding a consumer doubles your data size. This makes Kafka a good fit for things outside the bounds of normal messaging systems such as acting as a pipeline for offline data systems such as Hadoop. These offline systems may load only at intervals as part of a periodic ETL cycle, or may go down for several hours for maintenance, during which time Kafka is able to buffer even TBs of unconsumed data if needed.
Kafka also replicates its logs over multiple servers for fault-tolerance. One important architectural aspect of our replication implementation, in contrast to other messaging systems, is that replication is not an exotic bolt-on that requires complex configuration, only to be used in very specialized cases. Instead replication is assumed to be the default: we treat un-replicated data as a special case where the replication factor happens to be one.
Producers get an acknowledgement back when they publish a message containing the record's offset. The first record published to a partition is given the offset 0, the second record 1, and so on in an ever-increasing sequence. Consumers consume data from a position specified by an offset, and they save their position in a log by committing periodically: saving this offset in case that consumer instance crashes and another instance needs to resume from it's position.
Okay, hopefully that all made sense (if not, you can read a more complete introduction to Kafka here).
This Benchmark
This test is against trunk, as I made some improvements to the performance tests for this benchmark. But nothing too substantial has changed since the last full release, so you should see similar results with 0.8.1. I am also using our newly re-written Java producer, which offers much improved throughput over the previous producer client.
I've followed the basic template of this very nice RabbitMQ benchmark, but I covered scenarios and options that were more relevant to Kafka.
One quick philosophical note on this benchmark. For benchmarks that are going to be publicly reported, I like to follow a style I call "lazy benchmarking". When you work on a system, you generally have the know-how to tune it to perfection for any particular use case. This leads to a kind of benchmarketing where you heavily tune your configuration to your benchmark or worse have a different tuning for each scenario you test. I think the real test of a system is not how it performs when perfectly tuned, but rather how it performs "off the shelf". This is particularly true for systems that run in a multi-tenant setup with dozens or hundreds of use cases where tuning for each use case would be not only impractical but impossible. As a result, I have pretty much stuck with default settings, both for the server and the clients. I will point out areas where I suspect the result could be improved with a little tuning, but I have tried to resist the temptation to do any fiddling myself to improve the results.
For these tests, I had six machines each has the following specs
Intel Xeon 2.5 GHz processor with six cores
Six 7200 RPM SATA drives
32GB of RAM
1Gb Ethernet
The Kafka cluster is set up on three of the machines. The six drives are directly mounted with no RAID (JBOD style). The remaining three machines I use for Zookeeper and for generating load.
A three machine cluster isn't very big, but since we will only be testing up to a replication factor of three, it is all we need. As should be obvious, we can always add more partitions and spread data onto more machines to scale our cluster horizontally.
This hardware is actually not LinkedIn's normal Kafka hardware. Our Kafka machines are more closely tuned to running Kafka, but are less in the spirit of "off-the-shelf" I was aiming for with these tests. Instead, I borrowed these from one of our Hadoop clusters, which runs on probably the cheapest gear of any of our persistent systems. Hadoop usage patterns are pretty similar to Kafka's, so this is a reasonable thing to do.
Okay, without further ado, the results!
Producer Throughput
These tests will stress the throughput of the producer. No consumers are run during these tests, so all messages are persisted but not read (we'll test cases with both producer and consumer in a bit). Since we have recently rewritten our producer, I am testing this new code.
Single producer thread, no replication
821,557 records/sec
(78.3 MB/sec)
For this first test I create a topic with six partitions and no replication. Then I produce 50 million small (100 byte) records as quickly as possible from a single thread.
The reason for focusing on small records in these tests is that it is the harder case for a messaging system (generally). It is easy to get good throughput in MB/sec if the messages are large, but much harder to get good throughput when the messages are small, as the overhead of processing each message dominates.
Throughout this benchmark, when I am reporting MB/sec, I am reporting just the value size of the record times the request per second, none of the other overhead of the request is included. So the actually network usage is higher than what is reported. For example with a 100 byte message we would also transmit about 22 bytes of overhead per message (for an optional key, size delimiting, a message CRC, the record offset, and attributes flag), as well as some overhead for the request (including the topic, partition, required acknowledgements, etc). This makes it a little harder to see where we hit the limits of the NIC, but this seems a little more reasonable then including our own overhead bytes in throughput numbers. So, in the above result, we are likely saturating the 1 gigabit NIC on the client machine.
One immediate observation is that the raw numbers here are much higher than people expect, especially for a persistent storage system. If you are used to random-access data systems, like a database or key-value store, you will generally expect maximum throughput around 5,000 to 50,000 queries-per-second, as this is close to the speed that a good RPC layer can do remote requests. We exceed this due to two key design principles:
We work hard to ensure we do linear disk I/O. The six cheap disks these servers have gives an aggregate throughput of 822 MB/sec of linear disk I/O. This is actually well beyond what we can make use of with only a 1 gigabit network card. Many messaging systems treat persistence as an expensive add-on that decimates performance and should be used only sparingly, but this is because they are not able to do linear I/O.
At each stage we work on batching together small bits of data into larger network and disk I/O operations. For example, in the new producer we use a "group commit"-like mechanism to ensure that any record sends initiated while another I/O is in progress get grouped together. For more on understanding the importance of batching, check out this presentation by David Patterson on why "Latency Lags Bandwidth".
If you are interested in the details you can read a little more about this in our design documents.
Single producer thread, 3x asynchronous replication
786,980 records/sec
(75.1 MB/sec)
This test is exactly the same as the previous one except that now each partition has three replicas (so the total data written to network or disk is three times higher). Each server is doing both writes from the producer for the partitions for which it is a master, as well as fetching and writing data for the partitions for which it is a follower.
Replication in this test is asynchronous. That is, the server acknowledges the write as soon as it has written it to its local log without waiting for the other replicas to also acknowledge it. This means, if the master were to crash, it would likely lose the last few messages that had been written but not yet replicated. This makes the message acknowledgement latency a little better at the cost of some risk in the case of server failure.
The key take away I would like people to have from this is that replication can be fast. The total cluster write capacity is, of course, 3x less with 3x replication (since each write is done three times), but the throughput is still quite good per client. High performance replication comes in large part from the efficiency of our consumer (the replicas are really nothing more than a specialized consumer) which I will discuss in the consumer section.
Single producer thread, 3x synchronous replication
421,823 records/sec
(40.2 MB/sec)
This test is the same as above except that now the master for a partition waits for acknowledgement from the full set of in-sync replicas before acknowledging back to the producer. In this mode, we guarantee that messages will not be lost as long as one in-sync replica remains.
Synchronous replication in Kafka is not fundamentally very different from asynchronous replication. The leader for a partition always tracks the progress of the follower replicas to monitor their liveness, and we never give out messages to consumers until they are fully acknowledged by replicas. With synchronous replication we just wait to respond to the producer request until the followers have replicated it.
This additional latency does seem to affect our throughput. Since the code path on the server is very similar, we could probably ameliorate this impact by tuning the batching to be a bit more aggressive and allowing the client to buffer more outstanding requests. However, in spirit of avoiding special case tuning, I have avoided this.
Three producers, 3x async replication
2,024,032 records/sec
(193.0 MB/sec)
Our single producer process is clearly not stressing our three node cluster. To add a little more load, I'll now repeat the previous async replication test, but now use three producer load generators running on three different machines (running more processes on the same machine won't help as we are saturating the NIC). Then we can look at the aggregate throughput across these three producers to get a better feel for the cluster's aggregate capacity.
Producer Throughput Versus Stored Data
One of the hidden dangers of many messaging systems is that they work well only as long as the data they retain fits in memory. Their throughput falls by an order of magnitude (or more) when data backs up and isn't consumed (and hence needs to be stored on disk). This means things may be running fine as long as your consumers keep up and the queue is empty, but as soon as they lag, the whole messaging layer backs up with unconsumed data. The backup causes data to go to disk which in turns causes performance to drop to a rate that means messaging system can no longer keep up with incoming data and either backs up or falls over. This is pretty terrible, as in many cases the whole purpose of the queue was to handle such a case gracefully.
Since Kafka always persists messages the performance is O(1) with respect to unconsumed data volume.
To test this experimentally, let's run our throughput test over an extended period of time and graph the results as the stored dataset grows:
This graph actually does show some variance in performance, but no impact due to data size: we perform just as well after writing a TB of data, as we do for the first few hundred MBs.
The variance seems to be due to Linux's I/O management facilities that batch data and then flush it periodically. This is something we have tuned for a little better on our production Kafka setup. Some notes on tuning I/O are available here.
Consumer Throughput
Okay now let's turn our attention to consumer throughput.
Note that the replication factor will not effect the outcome of this test as the consumer only reads from one replica regardless of the replication factor. Likewise, the acknowledgement level of the producer also doesn't matter as the consumer only ever reads fully acknowledged messages, (even if the producer doesn't wait for full acknowledgement). This is to ensure that any message the consumer sees will always be present after a leadership handoff (if the current leader fails).
Single Consumer
940,521 records/sec
(89.7 MB/sec)
For the first test, we will consume 50 million messages in a single thread from our 6 partition 3x replicated topic.
Kafka's consumer is very efficient. It works by fetching chunks of log directly from the filesystem. It uses the sendfile API to transfer this directly through the operating system without the overhead of copying this data through the application. This test actually starts at the beginning of the log, so it is doing real read I/O. In a production setting, though, the consumer reads almost exclusively out of the OS pagecache, since it is reading data that was just written by some producer (so it is still cached). In fact, if you run I/O stat on a production server you actually see that there are no physical reads at all even though a great deal of data is being consumed.
Making consumers cheap is important for what we want Kafka to do. For one thing, the replicas are themselves consumers, so making the consumer cheap makes replication cheap. In addition, this makes handling out data an inexpensive operation, and hence not something we need to tightly control for scalability reasons.
Three Consumers
2,615,968 records/sec
(249.5 MB/sec)
Let's repeat the same test, but run three parallel consumer processes, each on a different machine, and all consuming the same topic.
As expected, we see near linear scaling (not surprising because consumption in our model is so simple).
Producer and Consumer
795,064 records/sec
(75.8 MB/sec)
The above tests covered just the producer and the consumer running in isolation. Now let's do the natural thing and run them together. Actually, we have technically already been doing this, since our replication works by having the servers themselves act as consumers.
All the same, let's run the test. For this test we'll run one producer and one consumer on a six partition 3x replicated topic that begins empty. The producer is again using async replication. The throughput reported is the consumer throughput (which is, obviously, an upper bound on the producer throughput).
As we would expect, the results we get are basically the same as we saw in the producer only case—the consumer is fairly cheap.
Effect of Message Size
I have mostly shown performance on small 100 byte messages. Smaller messages are the harder problem for a messaging system as they magnify the overhead of the bookkeeping the system does. We can show this by just graphing throughput in both records/second and MB/second as we vary the record size.
So, as we would expect, this graph shows that the raw count of records we can send per second decreases as the records get bigger. But if we look at MB/second, we see that the total byte throughput of real user data increases as messages get bigger:
We can see that with the 10 byte messages we are actually CPU bound by just acquiring the lock and enqueuing the message for sending—we are not able to actually max out the network. However, starting with 100 bytes, we are actually seeing network saturation (though the MB/sec continues to increase as our fixed-size bookkeeping bytes become an increasingly small percentage of the total bytes sent).
End-to-end Latency
2 ms (median)
3 ms (99th percentile)
14 ms (99.9th percentile)
We have talked a lot about throughput, but what is the latency of message delivery? That is, how long does it take a message we send to be delivered to the consumer? For this test, we will create producer and consumer and repeatedly time how long it takes for a producer to send a message to the kafka cluster and then be received by our consumer.
Note that, Kafka only gives out messages to consumers when they are acknowledged by the full in-sync set of replicas. So this test will give the same results regardless of whether we use sync or async replication, as that setting only affects the acknowledgement to the producer.
Replicating this test
If you want to try out these benchmarks on your own machines, you can. As I said, I mostly just used our pre-packaged performance testing tools that ship with Kafka and mostly stuck with the default configs both for the server and for the clients. However, you can see more details of the configuration and commands here.
不过QPartition的数量ƈ不是多好QPartition的数量越多,q_到每一个Broker上的数量也就多。考虑到Broker宕机(Network Failure, Full GC)的情况下Q需要由Controller来ؓ所有宕机的Broker上的所有Partition重新选DLeaderQ假设每个Partition的选D消?0msQ如果Broker上有500个PartitionQ那么在q行选D?s的时间里Q对上述Partition的读写操作都会触发LeaderNotAvailableException?/p>
现在唯一的问题是QC4讄foo.lock的新旉戻I是否会对锁生媄响。其实我们可以看到C4和C5执行的时间差值极,q且写入foo.lock中的都是有效旉错,所以对锁ƈ没有影响?br />?了让q个锁更加强壮,获取锁的客户端,应该在调用关键业务时Q再ơ调用GETҎ获取T1Q和写入的T0旉戌行对比,以免锁因其他情况被执行DEL?外解开而不知。以上步骤和情况Q很Ҏ从其他参考资料中看到。客L处理和失败的情况非常复杂Q不仅仅是崩溃这么简单,q可能是客户端因为某些操作被d 了相当长旉Q紧接着 DEL 命o被尝试执?但这旉却在另外的客L手上)。也可能因ؓ处理不当Q导致死锁。还有可能因为sleep讄不合理,DRedis在大q发下被压垮?最为常见的问题q有
C4 sends SETNX lock.foo in order to acquire the lock
The crashed client C3 still holds it, so Redis will reply with 0 to C4.
C4 sends GET lock.foo to check if the lock expired. If it is not, it will sleep for some time and retry from the start.
Instead, if the lock is expired because the Unix time at lock.foo is older than the current Unix time, C4 tries to perform: GETSET lock.foo (current Unix timestamp + lock timeout + 1)
Because of the GETSET semantic, C4 can check if the old value stored at key is still an expired timestamp. If it is, the lock was acquired.
If another client, for instance C5, was faster than C4 and acquired the lock with the GETSET operation, the C4 GETSET operation will return a non expired timestamp. C4 will simply restart from the first step. Note that even if C4 set the key a bit a few seconds in the future this is not a problem.
<script type="text/javascript"> var a = 1; var b = 2; var c = a * a + b * b; if(c> 1) { alert('c > 1'); } function add(a, b) { return a + b; } c = add(a,b); </script>
//module.jsvar student = function (name) { return name && { getName: function () { return name; } }; }, course = function (name) { return name && { getName: function () { return name; } } }, controller = function () { var data = {}; return { add: function (stu, cour) { var stuName = stu && stu.getName(), courName = cour && cour.getName(), current, _filter = function (e) { return e === courName; }; if (!stuName || !courName) return; current = data[stuName] = data[stuName] || []; if (current.filter(_filter).length === 0) { current.push(courName); } }, list: function (stu) { var stuName = stu && stu.getName(), current = data[stuName]; current && console.log(current.join(';')); } } }; //main.jsvar stu = new student('lyzg'), c = new controller(); c.add(stu,new course('javascript')); c.add(stu,new course('html')); c.add(stu,new course('css')); c.list(stu);
require.config({ baseUrl: 'scripts' }); require(['/lib/foo', 'test.js', 'http://cdn.baidu.com/js/jquery'], function(foo, bar, app) { // use foo bar app do sth });
requirejs.config({ //To get timely, correct error triggers in IE, force a define/shim exports check. enforceDefine: true, paths: { jquery: [ 'http://ajax.googleapis.com/ajax/libs/jquery/1.4.4/jquery.min', //If the CDN location fails, load from this location 'lib/jquery' ] } }); //Later require(['jquery'], function ($) { });
def rdd: RDD[T] } // TODO View bounds are deprecated, should use context bounds // Might need to change ClassManifest for ClassTag in spark 1.0.0 case class DemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] { // Here we use a single Long to try to ensure the sort is balanced, // but for really large dataset, we may want to consider // using a tuple of many Longs or even a GUID def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] = rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey() .grouped(numPartitions).map(t => (t._1._1, t._2)) } case class DemoRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] { def grouped(size: Int): RDD[T] = { // TODO Version where withIndex is cached val withIndex = rdd.mapPartitions(_.zipWithIndex) val startValues = withIndex.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toIterable.last))).toArray().toList .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L) withIndex.mapPartitionsWithIndex((i, iter) => iter.map { case (value, index) => (startValues(i) + index.toLong, value) }) .partitionBy(new Partitioner { def numPartitions: Int = size def getPartition(key: Any): Int = (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt }) .map(_._2) } }
val lst = 1 to 100 toList val exampleRDD = sc.makeRDD(1 to 20 toSeq, 2) val broadcastLst = sc.broadcast(lst) exampleRDD.filter(i=>broadcastLst.valuecontains(i)).collect.foreach(println)
private def addPendingTask(index: Int, readding: Boolean = false) { // Utility method that adds `index` to a list only if readding=false or it's not already there def addTo(list: ArrayBuffer[Int]) { if (!readding || !list.contains(index)) { list += index } } for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer)) case e: HDFSCacheTaskLocation => { val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => { for (e <- set) { addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer)) } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) } case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } } case _ => Unit } addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) } } if (tasks(index).preferredLocations == Nil) { addTo(pendingTasksWithNoPrefs) } if (!readding) { allPendingTasks += index // No point scanning this whole list to find the old task there } }