Mapreduce任务实现邮件监控
这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果 Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里先将json转 换成xml格式然后再发送到邮件。具体代码如下
import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.URI; import java.util.Properties; import java.util.StringTokenizer; import javax.mail.Authenticator; import javax.mail.Message; import javax.mail.MessagingException; import javax.mail.PasswordAuthentication; import javax.mail.Session; import javax.mail.Transport; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import net.sf.json.xml.XMLSerializer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; public class Email { private static final String USERNAME = "123456@qq.com";//发送邮件的用户名 private static final String PASSWORD = "123456789";//发送邮件的用户名对应的密码 private static final String EMAIL_HOST = "smtp.qq.com";//邮件服务器host public static void main(String args[]) { try { sendEmail("测试邮件", "测试邮件内容!", "test@qq.com"); System.out.println("email ok !"); } catch (MessagingException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @category 发送邮件方法,该方法实现发送Mapreduce任务报错信息,具体的报错信息通过hdfs的报错日志获取 * @param to 目标邮箱(可以多个邮箱,用,号隔开) * @param job 通过mapreduce的job获取jobID * @param time 通过时间戳访问错误日志路径 * @throws Exception */ public static void sendErrMail(String to, Job job, String time) throws Exception { String subject = job.getJobName(); String message = getErr(job, time); LoginMail lm = new LoginMail(USERNAME, PASSWORD); // 创建session Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", EMAIL_HOST); Session session = Session.getDefaultInstance(props, lm); // 创建 message Message msg = new MimeMessage(session); // 设置发送源地址 msg.setFrom(new InternetAddress(USERNAME)); // 多用户分解 StringTokenizer st = new StringTokenizer(to, ","); String[] recipients = new String[st.countTokens()]; int rc = 0; while (st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++) { addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // 设置邮件主题并发送邮件 msg.setSubject(subject); msg.setContent(message, "text/html;charset=utf-8"); Transport.send(msg); } /** * @category 自定义主题内容发送,这里的邮件内容不一定是Mapreduce的,可以任意填写 * @param subject 主题 * @param body 内容 * @param to 目标邮箱 * @throws MessagingException */ public static void sendEmail(String subject, String body, String to) throws MessagingException { LoginMail lm = new LoginMail(USERNAME, PASSWORD); // 创建session Properties props = new Properties(); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.host", EMAIL_HOST); Session session = Session.getDefaultInstance(props, lm); // 创建 message Message msg = new MimeMessage(session); // 设置发送源地址 msg.setFrom(new InternetAddress(USERNAME)); // 多用户分解 StringTokenizer st = new StringTokenizer(to, ","); String[] recipients = new String[st.countTokens()]; int rc = 0; while (st.hasMoreTokens()) recipients[rc++] = st.nextToken(); InternetAddress[] addressTo = new InternetAddress[recipients.length]; for (int i = 0; i < recipients.length; i++) { addressTo[i] = new InternetAddress(recipients[i]); } msg.setRecipients(Message.RecipientType.TO, addressTo); // 设置邮件主题并发送邮件 msg.setSubject(subject); msg.setContent(body, "text/html;charset=utf-8"); Transport.send(msg); } /** * @category 获取日志文件 * @param job * @param time * @return FSDataInputStream * @throws IOException */ public static FSDataInputStream getFile(Job job, String time) throws IOException { String year = time.substring(0, 4); String month = time.substring(4, 6); String day = time.substring(6, 8); String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/" + year + "/" + month + "/" + day + "/000000"; FileSystem fs = FileSystem.get(URI.create(dst), new Configuration()); FileStatus[] status = fs.listStatus(new Path(dst)); FSDataInputStream in = null; for (int i = 0; i < status.length; i++) { if (status[i].getPath().getName() .contains(job.getJobID().toString()) && status[i].getPath().getName().endsWith("jhist")) { in = new FSDataInputStream(fs.open(status[i].getPath())); } } return in; } /** * @category 解析文件类容为xml * @param job * @param time * @return xml * @throws IOException * @throws InterruptedException */ public static String getErr(Job job, String time) throws IOException, InterruptedException { FSDataInputStream in = getFile(job, time); Thread t1 = new Thread(); while (in == null) { t1.sleep(20000);//由于hdfs每个job的日志不是实时生成,所以需要每隔20秒检查一次hdfs该job日志是否已生成 t1.join(); in = getFile(job, time); } BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = ""; JSONObject jo; JSONArray jsa = new JSONArray(); String xml = ""; XMLSerializer xmlSerializer = new XMLSerializer(); while ((line = br.readLine()) != null) { if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) { jo = JSONObject.fromObject(line); jsa.add(jo); } } xml = xmlSerializer.write(jsa); in.close(); br.close(); return xml; } /** * @category 获取try-catch中的异常内容 * @param e Exception * @return 异常内容 */ public static String getException(Exception e) { ByteArrayOutputStream out = new ByteArrayOutputStream(); PrintStream pout = new PrintStream(out); e.printStackTrace(pout); String ret = new String(out.toByteArray()); pout.close(); try { out.close(); } catch (Exception ex) { } return ret; } } class LoginMail extends Authenticator { private String username; private String password; public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(username, password); } public LoginMail(String username, String password) { this.username = username; this.password = password; } }来自:http://my.oschina.net/mkh/blog/493885