| 注册
请输入搜索内容

热门搜索

Java Linux MySQL PHP JavaScript Hibernate jQuery Nginx
jopen
9年前发布

Mapreduce任务实现邮件监控

    这里主要使用Java自带邮件类实现Mapreduce任务的监控,如果 Mapreduce任务报错则发送报错邮件。Mapreduce的报错信息通过hdfs中的日志获取,里面的报错日志是json格式,这里先将json转 换成xml格式然后再发送到邮件。具体代码如下

import java.io.BufferedReader;  import java.io.ByteArrayOutputStream;  import java.io.IOException;  import java.io.InputStreamReader;  import java.io.PrintStream;  import java.net.URI;  import java.util.Properties;  import java.util.StringTokenizer;     import javax.mail.Authenticator;  import javax.mail.Message;  import javax.mail.MessagingException;  import javax.mail.PasswordAuthentication;  import javax.mail.Session;  import javax.mail.Transport;  import javax.mail.internet.InternetAddress;  import javax.mail.internet.MimeMessage;     import net.sf.json.JSONArray;  import net.sf.json.JSONObject;  import net.sf.json.xml.XMLSerializer;     import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.FSDataInputStream;  import org.apache.hadoop.fs.FileStatus;  import org.apache.hadoop.fs.FileSystem;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.mapreduce.Job;     public class Email {         private static final String USERNAME = "123456@qq.com";//发送邮件的用户名      private static final String PASSWORD = "123456789";//发送邮件的用户名对应的密码      private static final String EMAIL_HOST = "smtp.qq.com";//邮件服务器host         public static void main(String args[]) {          try {              sendEmail("测试邮件", "测试邮件内容!", "test@qq.com");              System.out.println("email ok !");          } catch (MessagingException e) {              // TODO Auto-generated catch block              e.printStackTrace();          }      }         /**       * @category 发送邮件方法,该方法实现发送Mapreduce任务报错信息,具体的报错信息通过hdfs的报错日志获取       * @param to 目标邮箱(可以多个邮箱,用,号隔开)       * @param job 通过mapreduce的job获取jobID       * @param time 通过时间戳访问错误日志路径       * @throws Exception       */      public static void sendErrMail(String to, Job job, String time)              throws Exception {          String subject = job.getJobName();          String message = getErr(job, time);          LoginMail lm = new LoginMail(USERNAME, PASSWORD);          // 创建session          Properties props = new Properties();          props.put("mail.smtp.auth", "true");          props.put("mail.smtp.host", EMAIL_HOST);          Session session = Session.getDefaultInstance(props, lm);             // 创建 message          Message msg = new MimeMessage(session);             // 设置发送源地址          msg.setFrom(new InternetAddress(USERNAME));             // 多用户分解          StringTokenizer st = new StringTokenizer(to, ",");          String[] recipients = new String[st.countTokens()];          int rc = 0;          while (st.hasMoreTokens())              recipients[rc++] = st.nextToken();          InternetAddress[] addressTo = new InternetAddress[recipients.length];          for (int i = 0; i < recipients.length; i++) {              addressTo[i] = new InternetAddress(recipients[i]);          }          msg.setRecipients(Message.RecipientType.TO, addressTo);             // 设置邮件主题并发送邮件          msg.setSubject(subject);          msg.setContent(message, "text/html;charset=utf-8");          Transport.send(msg);      }         /**       * @category 自定义主题内容发送,这里的邮件内容不一定是Mapreduce的,可以任意填写       * @param subject 主题       * @param body 内容       * @param to 目标邮箱       * @throws MessagingException       */      public static void sendEmail(String subject, String body, String to)              throws MessagingException {          LoginMail lm = new LoginMail(USERNAME, PASSWORD);          // 创建session          Properties props = new Properties();          props.put("mail.smtp.auth", "true");          props.put("mail.smtp.host", EMAIL_HOST);          Session session = Session.getDefaultInstance(props, lm);             // 创建 message          Message msg = new MimeMessage(session);             // 设置发送源地址          msg.setFrom(new InternetAddress(USERNAME));             // 多用户分解          StringTokenizer st = new StringTokenizer(to, ",");          String[] recipients = new String[st.countTokens()];          int rc = 0;          while (st.hasMoreTokens())              recipients[rc++] = st.nextToken();          InternetAddress[] addressTo = new InternetAddress[recipients.length];          for (int i = 0; i < recipients.length; i++) {              addressTo[i] = new InternetAddress(recipients[i]);          }          msg.setRecipients(Message.RecipientType.TO, addressTo);             // 设置邮件主题并发送邮件          msg.setSubject(subject);          msg.setContent(body, "text/html;charset=utf-8");          Transport.send(msg);         }         /**       * @category 获取日志文件       * @param job       * @param time       * @return FSDataInputStream       * @throws IOException       */      public static FSDataInputStream getFile(Job job, String time)              throws IOException {          String year = time.substring(0, 4);          String month = time.substring(4, 6);          String day = time.substring(6, 8);          String dst = "hdfs://192.168.1.100:9000/tmp/hadoop-yarn/staging/history/done/"                  + year + "/" + month + "/" + day + "/000000";          FileSystem fs = FileSystem.get(URI.create(dst), new Configuration());          FileStatus[] status = fs.listStatus(new Path(dst));          FSDataInputStream in = null;          for (int i = 0; i < status.length; i++) {              if (status[i].getPath().getName()                      .contains(job.getJobID().toString())                      && status[i].getPath().getName().endsWith("jhist")) {                  in = new FSDataInputStream(fs.open(status[i].getPath()));              }          }          return in;      }         /**       * @category 解析文件类容为xml       * @param job       * @param time       * @return xml       * @throws IOException       * @throws InterruptedException       */      public static String getErr(Job job, String time) throws IOException,              InterruptedException {          FSDataInputStream in = getFile(job, time);          Thread t1 = new Thread();          while (in == null) {              t1.sleep(20000);//由于hdfs每个job的日志不是实时生成,所以需要每隔20秒检查一次hdfs该job日志是否已生成              t1.join();              in = getFile(job, time);          }          BufferedReader br = new BufferedReader(new InputStreamReader(in));             String line = "";          JSONObject jo;          JSONArray jsa = new JSONArray();          String xml = "";          XMLSerializer xmlSerializer = new XMLSerializer();          while ((line = br.readLine()) != null) {              if (line.toUpperCase().indexOf("error".toUpperCase()) > -1) {                  jo = JSONObject.fromObject(line);                  jsa.add(jo);              }          }          xml = xmlSerializer.write(jsa);          in.close();          br.close();          return xml;         }         /**       * @category 获取try-catch中的异常内容       * @param e Exception       * @return 异常内容       */      public static String getException(Exception e) {          ByteArrayOutputStream out = new ByteArrayOutputStream();          PrintStream pout = new PrintStream(out);          e.printStackTrace(pout);          String ret = new String(out.toByteArray());          pout.close();          try {              out.close();          } catch (Exception ex) {          }          return ret;      }  }     class LoginMail extends Authenticator {         private String username;      private String password;         public String getUsername() {          return username;      }         public void setUsername(String username) {          this.username = username;      }         public String getPassword() {          return password;      }         public void setPassword(String password) {          this.password = password;      }         @Override      protected PasswordAuthentication getPasswordAuthentication() {          return new PasswordAuthentication(username, password);      }         public LoginMail(String username, String password) {          this.username = username;          this.password = password;      }  }
来自:http://my.oschina.net/mkh/blog/493885